2015-02-06 27 views
8

Ho una funzione che esegue alcune simulazioni e restituisce una matrice in formato stringa.Elaborazione multipla Python - monitoraggio del processo di funzionamento pool.map

Desidero eseguire la simulazione (la funzione) per variando i valori dei parametri di input, oltre 10000 valori di input possibili, e scrivere i risultati in un singolo file.

Sto usando multiprocessing, in particolare, la funzione pool.map per eseguire le simulazioni in parallelo.

Poiché l'intero processo di esecuzione della funzione di simulazione oltre 10000 volte richiede molto tempo, mi piacerebbe davvero monitorare il processo dell'intera operazione.

Penso che il problema nel mio codice corrente di seguito sia che, pool.map esegue la funzione 10000 volte, senza alcun tracciamento del processo durante tali operazioni. Una volta che l'elaborazione parallela ha terminato l'esecuzione di 10000 simulazioni (potrebbe essere di ore in giorni.), Quindi continuo a tenere traccia quando 10000 risultati di simulazione vengono salvati in un file. Quindi non si tratta proprio dell'elaborazione dell'operazione pool.map.

C'è una soluzione facile al mio codice che consentirà il tracciamento del processo?

def simFunction(input): 
    # Does some simulation and outputs simResult 
    return str(simResult) 

# Parallel processing 

inputs = np.arange(0,10000,1) 

if __name__ == "__main__": 
    numCores = multiprocessing.cpu_count() 
    pool = multiprocessing.Pool(processes = numCores) 
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out: 
     print("Starting to simulate " + str(len(inputs)) + " input values...") 
     counter = 0 
     for i in t: 
      out.write(i + '\n') 
      counter = counter + 1 
      if counter%100==0: 
       print(str(counter) + " of " + str(len(inputs)) + " input values simulated") 
    print('Finished!!!!') 

risposta

7

Se si utilizza una funzione iterata map, è abbastanza facile tenere traccia dei progressi.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> def simFunction(x,y): 
... import time 
... time.sleep(2) 
... return x**2 + y 
... 
>>> x,y = range(100),range(-100,100,2) 
>>> res = Pool().imap(simFunction, x,y) 
>>> with open('results.txt', 'w') as out: 
... for i in x: 
...  out.write("%s\n" % res.next()) 
...  if i%10 is 0: 
...  print "%s of %s simulated" % (i, len(x)) 
... 
0 of 100 simulated 
10 of 100 simulated 
20 of 100 simulated 
30 of 100 simulated 
40 of 100 simulated 
50 of 100 simulated 
60 of 100 simulated 
70 of 100 simulated 
80 of 100 simulated 
90 of 100 simulated 

In alternativa, è possibile utilizzare un asincrono map. Qui farò le cose in modo un po 'diverso, solo per mescolare.

>>> import time 
>>> res = Pool().amap(simFunction, x,y) 
>>> while not res.ready(): 
... print "waiting..." 
... time.sleep(5) 
... 
waiting... 
waiting... 
waiting... 
waiting... 
>>> res.get() 
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899] 

Si noti che sto usando pathos.multiprocessing invece di multiprocessing. È solo un fork di multiprocessing che consente di eseguire le funzioni map con più input, ha una serializzazione molto migliore e consente di eseguire chiamate map ovunque (non solo in __main__). Potresti usare multiprocessing per fare anche questo, tuttavia il codice sarebbe leggermente diverso.

O uno iterato o asincrono map consentirà di scrivere qualsiasi codice che si desidera fare meglio il tracciamento del processo.Ad esempio, passa un "id" univoco a ciascun lavoro e guarda che ritorna o che ogni lavoro restituisce il suo id di processo. Ci sono molti modi per tenere traccia dei progressi e dei processi ... ma quanto sopra dovrebbe darti un inizio.

È possibile ottenere pathos qui: https://github.com/uqfoundation

+0

grazie mille! – user32147

3

Non esiste una "soluzione facile". map significa nascondere i dettagli di implementazione da te. E in questo caso si desidera i dettagli. Cioè, le cose diventano un po 'più complesse, per definizione. Hai bisogno di cambiare il paradigma della comunicazione. Ci sono molti modi per farlo.

Uno è: creare una coda per raccogliere i risultati e consentire ai dipendenti di inserire risultati in questa coda. È quindi possibile, all'interno di un thread o processo di monitoraggio, esaminare la coda e consumare i risultati non appena arrivano. Durante il consumo, è possibile analizzarli e generare output di registro. Questo potrebbe essere il modo più generale per tenere traccia dei progressi: puoi rispondere ai risultati in entrata in qualsiasi modo, in tempo reale.

Un modo più semplice potrebbe essere quello di modificare leggermente la funzione di lavoro e generare output di registro. Analizzando attentamente l'output del registro con strumenti esterni (come ad esempio grep e wc), è possibile trovare un modo molto semplice per tenere traccia.

+1

grazie. potresti fornire qualche semplice esempio per favore? – user32147

3

Penso che quello che vi serve è un file di log .

Si consiglia di utilizzare il modulo di registrazione che fa parte della libreria standard Python. Ma sfortunatamente la registrazione non è multiprocessing-safe. Quindi non puoi usarlo immediatamente nella tua app.

Quindi, sarà necessario utilizzare un gestore di log sicuro per la multiprocessing o implementare il proprio utilizzando una coda o blocchi insieme al modulo di registrazione .

C'è un sacco di discussioni su questo in Stackoverflow. Questo, per esempio: How should I log while using multiprocessing in Python?

Se la maggior parte del carico della CPU è in funzione di simulazione e non si intende utilizzare la rotazione dei log, probabilmente si può utilizzare un semplice meccanismo di blocco in questo modo:

import multiprocessing 
import logging 

from random import random 
import time 


logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s %(process)s %(levelname)s %(message)s', 
    filename='results.log', 
    filemode='a' 
) 


def simulation(a): 
    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Simulating with %s" % a) 

    # simulation 
    time.sleep(random()) 
    result = a*2 

    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Finished simulation with %s. Result is %s" % (a, result)) 

    return result 

if __name__ == '__main__': 

    logging.debug("Starting the simulation") 
    inputs = [x for x in xrange(100)] 
    num_cores = multiprocessing.cpu_count() 
    print "num_cores: %d" % num_cores 
    pool = multiprocessing.Pool(processes=num_cores) 
    t = pool.map(simulation, inputs) 
    logging.debug("The simulation has ended") 

Puoi "tail -f" il tuo file di registro durante l'esecuzione. Questo è ciò che dovresti vedere:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation 
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12 
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28 
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20 
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16 
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8 
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4 
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24 
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0 
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24 
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13 
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16 
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9 
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48 
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25 
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50 
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26 
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26 
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14 
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28 
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15 
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8 
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5 

Provato su Windows e Linux.

Spero che questo aiuti

+0

'multiprocessing.get_logger()' restituisce un logger con funzionalità limitate protetto da blocchi, vedere https://docs.python.org/2/library/multiprocessing.html#logging –

+0

Sì, ma questo è il logger del modulo ... il tuo log verrà mixato con i messaggi a livello di modulo: Provalo e vedrai messaggi come questo: 2015-02-08 23: 47: 10,954 9288 DEBUG creato semlock con handle 448 –

+0

Oh, sei vero, non l'ho mai usato in realtà e ho sfogliato i documenti troppo velocemente. –