2012-11-19 8 views
32

Sto cercando di risolvere un grosso problema numerico che coinvolge molti sottoproblemi e sto utilizzando il modulo di multiprocessing di Python (in particolare Pool.map) per suddividere diversi sottoproblemi indipendenti su diversi core. Ogni sottoproblema comporta il calcolo di molti sottoprobleble e sto cercando di memorizzare efficacemente questi risultati memorizzandoli in un file se non sono ancora stati calcolati da alcun processo, altrimenti salta il calcolo e leggi i risultati dal file.Elaborazione multiprocesso in Python in modo sicuro in un file

Sono in corso problemi di concorrenza con i file: diversi processi a volte controllano se un sotto-sottoproblema è stato ancora calcolato (cercando il file in cui verranno archiviati i risultati), vedere che non lo ha, eseguire il calcolo, quindi provare a scrivere i risultati nello stesso file allo stesso tempo. Come evito di scrivere collisioni come questa?

+3

Partenza un esempio dalla documentazione di usare [ 'multiprocessing.Lock'] (http://docs.python.org/2/library/multiprocessing.html#synchronization-between-processes) per sincronizzare multipla processi. –

+11

Si potrebbe avere un unico processo di scrittura dei risultati, con una coda come input che potrebbe essere alimentata dagli altri processi di lavoro. Credo che sarebbe sicuro avere tutti i processi di lavoro di sola lettura. – GP89

+0

Avrei dovuto dire che, per rendere le cose più complicate, sto eseguendo più grossi problemi principali nello stesso momento su un cluster, con ciascuno dei quali scrive risultati in sottoproblemi sullo stesso file system di rete. Quindi posso ottenere collisioni da processi eseguiti su macchine separate interamente (quindi non penso che le soluzioni che usano cose come il multiprocessing.Lock funzioneranno). –

risposta

63

@ GP89 ha menzionato una buona soluzione. Utilizzare una coda per inviare le attività di scrittura a un processo dedicato che ha il solo accesso in scrittura al file. Tutti gli altri lavoratori hanno accesso di sola lettura. Questo eliminerà le collisioni. Ecco un esempio che utilizza apply_async, ma funzionerà con la mappa troppo:

import multiprocessing as mp 
import time 

fn = 'c:/temp/temp.txt' 

def worker(arg, q): 
    '''stupidly simulates long running process''' 
    start = time.clock() 
    s = 'this is a test' 
    txt = s 
    for i in xrange(200000): 
     txt += s 
    done = time.clock() - start 
    with open(fn, 'rb') as f: 
     size = len(f.read()) 
    res = 'Process' + str(arg), str(size), done 
    q.put(res) 
    return res 

def listener(q): 
    '''listens for messages on the q, writes to file. ''' 

    f = open(fn, 'wb') 
    while 1: 
     m = q.get() 
     if m == 'kill': 
      f.write('killed') 
      break 
     f.write(str(m) + '\n') 
     f.flush() 
    f.close() 

def main(): 
    #must use Manager queue here, or will not work 
    manager = mp.Manager() 
    q = manager.Queue()  
    pool = mp.Pool(mp.cpu_count() + 2) 

    #put listener to work first 
    watcher = pool.apply_async(listener, (q,)) 

    #fire off workers 
    jobs = [] 
    for i in range(80): 
     job = pool.apply_async(worker, (i, q)) 
     jobs.append(job) 

    # collect results from the workers through the pool result queue 
    for job in jobs: 
     job.get() 

    #now we are done, kill the listener 
    q.put('kill') 
    pool.close() 

if __name__ == "__main__": 
    main() 

buona fortuna,

Mike

+1

Hey Mike, grazie per la risposta. Penso che questo avrebbe funzionato per la domanda come l'avevo formulata, ma non sono così sicuro se risolverà il problema completo come delineato nei commenti alla domanda, in particolare in che modo ho diversi programmi principali in esecuzione su più macchine su una rete filesystem, ognuno dei quali potrebbe avere processi che proveranno a scrivere sullo stesso file. (FWIW, ho aggirato il mio problema personale in modo hacky qualche tempo fa, ma sto commentando nel caso in cui altri abbiano problemi simili.) –

+1

Mi piacerebbe davvero sovvertire questo numero di volte. Questo è stato utile così tante volte per me. Ancora una volta oggi. – Eduardo

+0

Grazie Mike: avevo difficoltà a usare le code MP. Il tuo esempio lo rende molto chiaro e diretto. – Anurag

0

Sembra a me che è necessario utilizzare Manager per salvare temporaneamente i risultati a un elenco e quindi scrivere i risultati dall'elenco in un file. Inoltre, utilizzare starmap per passare l'oggetto che si desidera elaborare e l'elenco gestito. Il primo passo è costruire il parametro da passare a starmap, che include l'elenco gestito.

from multiprocessing import Manager 
from multiprocessing import Pool 
import pandas as pd``` 

def worker(row, param): 
    # do something here and then append it to row 
    x = param**2 
    row.append(x) 

if __name__ == '__main__': 
    pool_parameter = [] # list of objects to process 
    with Manager() as mgr: 
     row = mgr.list([]) 

     # build list of parameters to send to starmap 
     for param in pool_parameter: 
      params.append([row,param]) 

     with Pool() as p: 
      p.starmap(worker, params) 

Da questo punto è necessario decidere come gestire l'elenco. Se hai tonnellate di RAM e un enorme set di dati non esitare a concatenare usando i panda. Quindi puoi salvare facilmente il file come csv o pickle.

 df = pd.concat(row, ignore_index=True) 

     df.to_pickle('data.pickle') 
     df.to_csv('data.csv')