2010-05-19 7 views
7

Mi chiedo se questo è il modo migliore per farlo. Ho circa 500 thread che funzionano a tempo indeterminato, ma Thread.sleep per un minuto quando viene eseguito un ciclo di elaborazione.500 thread di lavoro, che tipo di pool di thread?

ExecutorService es = Executors.newFixedThreadPool(list.size()+1); 
    for (int i = 0; i < list.size(); i++) { 
     es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects 
    } 

Il codice che sta eseguendo è molto semplice e fondamentalmente solo questo

class aThread extends Thread { 
    public void run(){ 
     while(true){ 
     Thread.sleep(ONE_MINUTE); 
     //Lots of computation every minute 
     } 
    } 
} 

ho bisogno di un thread separati per ciascun compito esecuzione, così cambiando l'architettura non è un'opzione. Ho provato a rendere la mia dimensione threadPool uguale a Runtime.getRuntime(). AvailableProcessors() che ha tentato di eseguire tutti i 500 thread, ma solo di lasciarli eseguire 8 (4xhyperthreading). Gli altri thread non si arrenderanno e lasceranno che altri thread abbiano il loro turno. Ho provato a mettere in attesa() e notifica(), ma ancora senza fortuna. Se qualcuno ha un semplice esempio o qualche consiglio, sarei grato!

Bene, il design è discutibilmente imperfetto. I thread implementano Genetic-Programming o GP, un tipo di algoritmo di apprendimento. Ogni thread analizza le tendenze avanzate e fa previsioni. Se il thread si completa, l'apprendimento viene perso. Detto questo, speravo che il sonno() mi permettesse di condividere alcune delle risorse, mentre un thread non è "imparare"

Così i requisiti reali sono

come posso pianificare le attività che mantengono lo stato ed eseguono ogni 2 minuti, ma controllano quanti eseguono contemporaneamente.

+1

C'è qualche ragione particolare per cui non è possibile eseguirli in sequenza contemporaneamente? Ciò significa che sono in esecuzione contemporaneamente e che raccolgono i risultati al termine. Il problema è che richiederà sempre lo stesso tempo se lo fai con 500 thread o se lo fai con 8 thread. –

+0

Quanto dura un calcolo da solo? Quanto dura un calcolo se viene eseguito entro 500 thread? –

+2

Non mi è chiaro cosa vuoi fare. Perché non provi semplicemente a eseguire tutti i tuoi thread? Qualcosa come: for (int i ....) {((Thread) coreAppVector.elementAt (i)). Start(); } –

risposta

10

Perché non utilizzare uno ScheduledExecutorService a schedule each task to run once per minute, invece di lasciare tutti questi thread inattivi per un minuto intero?

ScheduledExecutorService workers = 
    Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); 
for (Runnable task : list) { 
    workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES); 
} 

Cosa intendi con "cambiare l'architettura non è un'opzione"? Se si intende che non è possibile modificare il proprio compito in tutti gli (in particolare, le attività devono eseguire il ciclo anziché eseguire una volta e la chiamata a Thread.sleep() non può essere rimossa), quindi "le buone prestazioni non sono un'opzione ," o.

+1

Chiaramente, non dovrebbe usare i thread, dovrebbe solo implementare 'Runnable', e ogni' run() 'dovrebbe terminare dopo un singolo calcolo. – erickson

+1

Questa sarebbe la soluzione migliore che l'OP sta cercando se ogni eseguibile non è in un ciclo perenne ed esce dal metodo di esecuzione dopo il completamento di una singola esecuzione –

13

Se i thread non terminano, questo è l'errore del codice all'interno del thread, non del pool di thread. Per assistenza più dettagliata è necessario pubblicare il codice che viene eseguito.

Inoltre, perché si mette ogni filo in sospeso quando è finito; non sarebbe meglio solo lasciarlo completare?

Inoltre, penso che si stia abusando del pool di thread avendo un numero di thread uguale al numero di attività che si desidera eseguire. Il punto di un pool di thread è di porre un limite al numero di risorse utilizzate; questo approccio non è migliore di non utilizzare affatto un pool di thread.

Infine, non è necessario passare le istanze di Thread al numero ExecutorService, solo le istanze di Runnable. ExecutorService mantiene il proprio pool di thread che si arrestano indefinitamente, estrapolando il lavoro da una coda interna (il lavoro è il Runnable s inviato).

+2

corretto. L'esecutore non ha idea che un Runnable stia dormendo. Finché il Runnable non è terminato, l'esecutore pensa che sia in esecuzione. –

3

Non sono sicuro che il codice sia semanticamente corretto nel modo in cui utilizza un pool di thread. ExecutionService crea e gestisce i thread internamente, un client deve solo fornire un'istanza di Runnable, il cui metodo run() verrà eseguito nel contesto di uno dei thread in pool. È possibile controllare my example. Si noti inoltre che ogni thread in esecuzione richiede circa 10 Mb di memoria di sistema per lo stack e su Linux la mappatura dei thread da java a nativo è 1-a-1.

2

Invece di mettere un battistrada a dormire, è necessario lasciarlo tornare e utilizzare un ThreadPoolexecutor per eseguire il lavoro inviato ogni minuto alla coda di lavoro.

-1

È necessario un semaforo.

class AThread extends Thread { 
    Semaphore sem; 
    AThread(Semaphore sem) { 
    this.sem = sem; 
    } 
    public void run(){ 
     while(true){ 
     Thread.sleep(ONE_MINUTE); 
     sem.acquire(); 
     try { 
      //Lots of computation every minute 
     } finally { 
      sem.release(); 
     } 
     } 
    } 
} 

Quando istanziare i AThreads è necessario passare alla stessa istanza del semaforo:

Semaphore sem = new Semaphore(MAX_AVAILABLE, true); 

Edit: Chi ha votato verso il basso può spiegare perché? C'è qualcosa di sbagliato nella mia soluzione?

+0

L'OP non ha menzionato nulla sulla sincronizzazione. Questo è completamente fuori tema. – SimonC

+0

L'OP ha chiesto un modo per garantire che solo un numero fisso di thread fosse "attivo" in un dato momento, e questo è un problema di sincronizzazione. Può essere che la mia soluzione non sia quella ottimale (mi piace la soluzione accettata) di sicuro non è fuori tema perché risolve il problema. –

1

8 thread è il massimo che il sistema è in grado di gestire, più e si sta rallentando con il cambio di contesto.

Guarda questo articolo http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4 Ti fornirà una panoramica di come farlo.

+0

Questo presuppone l'elaborazione associata alla CPU. Non è corretto se i thread sono legati all'I/O. –

+0

@willie Wheeler - Nei commenti per la domanda dice che sta eseguendo calcoli. Probabilmente è sicuro assumere che sia legato alla CPU. Hai letto o hai appena cantato? –

2

Per rispondere alla domanda, che tipo di pool di thread?

Ho postato i miei commenti ma questo dovrebbe risolvere il problema. Hai un calcolo che può richiedere 2 secondi per essere completato. Hai molte attività (500) che vuoi completare il più velocemente possibile. Il throughput più rapido che è possibile ottenere, supponendo che non vi sia alcun IO e/o traffico di rete, è con il numero di thread Runtime.getRuntime().availableProcessors().

Se si aumenta il numero a 500 thread, ciascuna attività verrà eseguita su una propria thread, ma il sistema operativo pianificherà un thread ogni tanto da assegnare a un altro thread. Questo è il contesto in cui si verifica un determinato punto. Ogni opzione di contesto aumenterà la quantità di tempo per l'esecuzione di ciascuna attività.

L'immagine grande qui è che l'aggiunta di più thread NON equivale a un throughput maggiore quando si supera il numero di processori.

Modifica: un rapido aggiornamento. Non hai bisogno di dormire qui. Quando esegui le 500 attività con 8 processori, ciascuna attività verrà completata nei 2 secondi, terminata e il thread su cui è in esecuzione eseguirà l'attività successiva e completerà quella.

-1

È possibile trovare alcuni miglioramenti nel throughput riducendo il numero di thread a ciò che il sistema può realisticamente gestire. Sei aperto a cambiare un po 'il design del filo? Sposterà lo schedulatore per mettere in coda i dormienti invece di avere centinaia di thread dormienti.

class RepeatingWorker implements Runnable { 

private ExecutorService executor; 
private Date lastRan; 

//constructor takes your executor 

@Override 
public void run() { 

    try { 
    if (now > lastRan + ONE_MINUTE) { 
     //do job 
     lastRan = now; 
    } else { 
     return; 
    } finally { 
    executor.submit(this); 
    } 
} 
} 

Ciò preserva il nucleo semantico del 'ripete di lavoro a tempo indeterminato, ma aspetta almeno un minuto tra le esecuzioni', ma ora si può sintonizzare il pool di thread a qualcosa la macchina in grado di gestire e quelli che non funzionano sono in coda invece di bighellonare nello scheduler come thread dormienti. C'è un po 'di pazienza se nessuno sta effettivamente facendo qualcosa, ma sto assumendo dal tuo post che l'intero scopo dell'applicazione è quello di eseguire questi thread e al momento sta inveendo i tuoi processori.Potrebbe essere necessario sintonizzarsi su questo se si deve fare spazio per altre cose :)

0

Ho bisogno di un thread separato per ogni attività in esecuzione, quindi cambiare l'architettura non è un'opzione.

Se che è true (ad esempio, effettuare una chiamata a una funzione di bloccaggio esterno), quindi creare thread separati per loro e iniziare. Non è possibile creare un pool di thread con un numero limitato di thread, poiché una funzione di blocco in uno dei thread impedirà l'inserimento di qualsiasi altro eseguibile e non aumenterà molto la creazione di un pool di thread con un thread per attività.

Ho provato a fare la mia taglia threadpool pari a Runtime.getRuntime(). AvailableProcessors(), che ha tentato di eseguire tutte le 500 le discussioni, ma lasciate solo 8 (4xhyperthreading) di eseguirli.

Quando si passano gli oggetti Thread che si stanno creando al pool di thread, si vede solo che implementano Runnable. Pertanto verrà eseguito ogni Runnable per il completamento. Qualsiasi loop che interrompe il metodo run() non consentirà l'esecuzione della successiva attività in coda; ad esempio:

public static void main (String...args) { 
    ExecutorService executor = Executors.newFixedThreadPool(2); 

    for (int i = 0; i < 10; ++i) { 
     final int task = i; 

     executor.execute(new Runnable() { 
     private long lastRunTime = 0; 
      @Override 
      public void run() { 

       for (int iteration = 0; iteration < 4;) 
       { 
        if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT) 
        { 
         // do your work here 
         ++iteration; 
         System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread()); 

         this.lastRunTime = System.currentTimeMillis(); 
        } 
        else 
        { 
         Thread.yield(); // otherwise, let other threads run 
        } 
       } 
      } 
     }); 
    } 

    executor.shutdown(); 
} 

stampe fuori:

Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}. 
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}. 
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}. 
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}. 
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}. 
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}. 
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}. 
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}. 
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}. 
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}. 
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}. 
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}. 
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}. 
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}. 
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}. 
... 

mostrano che il primo (filo dimensioni della piscina) attività vengono eseguite fino al completamento prima i compiti successivi vengono programmate.

Quello che devi fare è creare attività che vengono eseguite per un po ', quindi lasciare eseguire altre attività. Piuttosto come si struttura questi dipende da ciò che si vuole raggiungere

  • se si desidera che tutti i compiti da eseguire, allo stesso tempo, il tutto attendere per un minuto, poi tutti corrono allo stesso tempo nuovo, o se il compiti non sono sincronizzati tra loro
  • se si voleva davvero ogni attività per l'esecuzione a un intervallo di un minuto
  • se i vostri compiti sono potenzialmente bloccando o no, e così in realtà richiedono thread separati
  • quale comportamento è previsto se un blocco di attività più lungo della finestra prevista per l'esecuzione
  • quale comportamento ci si aspetta se un blocchi di attività più lunghi rispetto al tasso di ripetizione (blocchi per più di un minuto)

A seconda delle risposte a queste, una combinazione di ScheduledExecutorService, semafori o mutex può essere utilizzato per coordinare i compiti. Il caso più semplice è l'attività non bloccante, non sincrona, nel qual caso utilizzare direttamente ScheduledExecutorService per eseguire i runnables una volta al minuto.

+0

Come già detto fai attenzione che l'utilizzo di yield() introduca un "occupato" aspetta "e dovrebbe essere evitato. –

+0

Pensavo avessi detto che non si trattava di una "attesa indaffarata", quindi qual è? –

0

È possibile riscrivere il progetto per l'utilizzo di un framework di concurrent basato su agenti, ad esempio Akka?

1

Questo dovrebbe fare ciò che desideri, ma non quello che hai chiesto :-) Devi prendere il Thread.sleep()

ScheduledRunnable.java

import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

public class ScheduledRunnable 
{ 
    public static void main(final String[] args) 
    { 
     final int numTasks = 10; 
     final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); 
     for (int i = 0; i < numTasks; i++) 
     { 
      ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS); 
     } 
    } 

    private static class MyRunnable implements Runnable 
    { 
     private int id; 
     private int numRuns; 

     private MyRunnable(final int id) 
     { 
      this.id = id; 
      this.numRuns = 0; 
     } 

     @Override 
     public void run() 
     { 
      this.numRuns += 1; 
      System.out.format("%d - %d\n", this.id, this.numRuns); 
     } 
    } 
} 

Questo orari le Runnables ogni 10 secondi per mostrare il comportamento . Se è davvero necessario attendere un tempo prefissato per l'elaborazione di DOPO, potrebbe essere necessario giocare con il metodo .scheduleXXX che è necessario. Penso che fixedWait eseguirà solo ogni N quantità di tempo indipendentemente da quale sia il tempo di esecuzione.