2012-12-03 6 views
7

Finora ogni volta che avevo bisogno di usare multiprocessing ho fatto così creando manualmente un "pool di processi" e condividendo una coda di lavoro con tutti i sottoprocessi.Come ottenere la quantità di "lavoro" che deve essere fatta da un pool di multiprocessing Python?

Ad esempio:

from multiprocessing import Process, Queue 


class MyClass: 

    def __init__(self, num_processes): 
     self._log   = logging.getLogger() 
     self.process_list = [] 
     self.work_queue = Queue() 
     for i in range(num_processes): 
      p_name = 'CPU_%02d' % (i+1) 
      self._log.info('Initializing process %s', p_name) 
      p = Process(target = do_stuff, 
         args = (self.work_queue, 'arg1'), 
         name = p_name) 

questo modo ho potuto aggiungere cose alla coda, che sarebbe stata consumata dai sottoprocessi. Potrei quindi monitorare quanto il trattamento è stato controllando la Queue.qsize():

while True: 
     qsize = self.work_queue.qsize() 
     if qsize == 0: 
      self._log.info('Processing finished') 
      break 
     else: 
      self._log.info('%d simulations still need to be calculated', qsize) 

Ora immagino che multiprocessing.Pool potrebbe semplificare molto questo codice.

Quello che non ho potuto scoprire è come posso monitorare la quantità di "lavoro" ancora da fare.

Prendiamo il seguente esempio:

from multiprocessing import Pool 


class MyClass: 

    def __init__(self, num_processes): 
     self.process_pool = Pool(num_processes) 
     # ... 
     result_list = [] 
     for i in range(1000):    
      result = self.process_pool.apply_async(do_stuff, ('arg1',)) 
      result_list.append(result) 
     # ---> here: how do I monitor the Pool's processing progress? 
     # ...? 

Tutte le idee?

risposta

11

Utilizzare una coda Manager. Questa è una coda condivisa tra i processi di lavoro. Se si utilizza una coda normale, verrà prelevata e annullata da ciascun worker e quindi copiata, in modo che la coda non possa essere aggiornata da ciascun worker.

Quindi i tuoi dipendenti aggiungono elementi alla coda e monitorano lo stato della coda mentre gli operai lavorano. È necessario farlo utilizzando map_async in quanto ciò consente di vedere quando l'intero risultato è pronto, consentendo di interrompere il ciclo di monitoraggio.

Esempio:

import time 
from multiprocessing import Pool, Manager 


def play_function(args): 
    """Mock function, that takes a single argument consisting 
    of (input, queue). Alternately, you could use another function 
    as a wrapper. 
    """ 
    i, q = args 
    time.sleep(0.1) # mock work 
    q.put(i) 
    return i 

p = Pool() 
m = Manager() 
q = m.Queue() 

inputs = range(20) 
args = [(i, q) for i in inputs] 
result = p.map_async(play_function, args) 

# monitor loop 
while True: 
    if result.ready(): 
     break 
    else: 
     size = q.qsize() 
     print(size) 
     time.sleep(0.1) 

outputs = result.get() 
0

Dai documenti, mi sembra che quello che vuoi fare sia raccogliere i tuoi result s in un elenco o altra sequenza, quindi iterare l'elenco dei risultati per verificare ready per creare l'elenco di output. È quindi possibile calcolare lo stato di elaborazione confrontando il numero di oggetti risultato rimanenti non in stato pronto con il numero totale di lavori inviati. Vedi http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult

1

mi si avvicinò con la soluzione qui di seguito per async_call.

Esempio di script giocattolo banale, ma dovrebbe applicarsi in generale penso.

Fondamentalmente in un ciclo infinito eseguire il polling del valore di pronto degli oggetti risultato in un generatore di elenchi e sommare per ottenere un conteggio di quante delle attività del pool inviate rimangono.

Una volta che non ci sono più pause e join() & close().

Aggiungi sleep in loop come desiderato.

Stesso principio delle soluzioni precedenti ma senza coda. Se tieni traccia di quante attività hai inizialmente inviato al Pool, puoi calcolare la percentuale completa, ecc ...

import multiprocessing 
import os 
import time 
from random import randrange 


def worker(): 
    print os.getpid() 

    #simulate work 
    time.sleep(randrange(5)) 

if __name__ == '__main__': 

    pool = multiprocessing.Pool(processes=8) 
    result_objs = [] 

    print "Begin dispatching work" 

    task_count = 10 
    for x in range(task_count): 
     result_objs.append(pool.apply_async(func=worker)) 

    print "Done dispatching work" 

    while True: 
     incomplete_count = sum(1 for x in result_objs if not x.ready()) 

     if incomplete_count == 0: 
      print "All done" 
      break 

     print str(incomplete_count) + " Tasks Remaining" 
     print str(float(task_count - incomplete_count)/task_count * 100) + "% Complete" 
     time.sleep(.25) 

    pool.close() 
    pool.join() 
1

Ho avuto lo stesso problema e si avvicinò con un po 'soluzione semplice per gli oggetti MapResult (seppur utilizzando i dati MapResult interni)

pool = Pool(POOL_SIZE) 

result = pool.map_async(get_stuff, todo) 
while not result.ready(): 
    remaining = result._number_left * result._chunksize 
    sys.stderr.write('\r\033[2KRemaining: %d' % remaining) 
    sys.stderr.flush() 
    sleep(.1) 

print >> sys.stderr, '\r\033[2KRemaining: 0' 

Si noti che il valore residuo non è sempre esatto dal momento che la le dimensioni del blocco spesso vengono arrotondate in base al numero di elementi da elaborare.

È possibile evitare questo utilizzando pool.map_async(get_stuff, todo, chunksize=1)