Recentemente ho iniziato a sperimentare con il multiprocessing per velocizzare un'attività. Ho creato uno script che combina le stringhe fuzzy e calcola i punteggi utilizzando diversi algoritmi (volevo confrontare diverse tecniche di corrispondenza). È possibile trovare la fonte completa qui: https://bitbucket.org/bergonzzi/fuzzy-compare/src. Come input ci vogliono 2 file che vengono combinati in coppie (ogni riga di file1 con ogni riga di file2). Per ogni coppia, vengono calcolati i punteggi delle partite fuzzy.Code di multiprocessing in Python più lente di pool.map
Ho realizzato 3 versioni. Correndo con i dati di esempio forniti nel mio repo (che si compone di 697.340 articoli dopo essere stati combinati in coppie), ho le seguenti tempistiche:
- semplice singolo processo - 0:00:47
- Multiprocesso con piscina. map() - 0:00:13
- Multiprocesso utilizzando code (produttore/modello consumer) - 0:01:04
sto cercando di capire perché il mio Pool.map() versione è molto più veloce rispetto alla mia versione di coda, che è in realtà più lenta di quella semplice a processo singolo.
Il mio ragionamento per il tentativo di usare le code è che la versione Pool.map() mantiene i risultati fino a quando tutto è finito e scrive solo su un file alla fine. Ciò significa che per i file di grandi dimensioni finisce per mangiare molta memoria. I'm talking about this version (collegandosi ad esso perché è molto codice da incollare qui).
To solve this I refactored it into a producer/consumer pattern (o tentato almeno). Qui prima produco lavori combinando entrambi i file di input e li inserisco in una coda che i consumatori elaborano (calcola i punteggi delle corrispondenze fuzzy). I lavori finiti vengono messi in coda. Quindi ho un singolo processo che preleva gli articoli fatti da questa coda e li scrive in un file. In teoria, in questo modo, non avrei bisogno di tanta memoria poiché i risultati verrebbero scaricati sul disco. Sembra funzionare bene ma è molto più lento. Ho anche notato che i 4 processi che sto generando non sembrano utilizzare fino al 100% della CPU quando si guarda Activity Monitor su Mac OSX (che non è il caso della versione Pool.map()).
Un'altra cosa che noto è che la mia funzione di produttore sembra riempire correttamente la coda, ma i processi di consumo sembrano attendere fino a quando la coda non viene riempita anziché iniziare a funzionare non appena arriva il primo oggetto. Probabilmente sto facendo qualcosa di sbagliato lì ...
Per riferimento ecco alcuni dei codici rilevanti per la versione della coda (anche se è meglio guardare il codice completo nel repository collegato sopra).
Ecco la mia funzione di produttore:
def combine(list1, list2):
'''
Combine every item of list1 with every item of list 2,
normalize put the pair in the job queue.
'''
pname = multiprocessing.current_process().name
for x in list1:
for y in list2:
# slugify is a function to normalize the strings
term1 = slugify(x.strip(), separator=' ')
term2 = slugify(y.strip(), separator=' ')
job_queue.put_nowait([term1, term2])
Questa è la funzione scrittore:
def writer(writer_queue):
out = open(file_out, 'wb')
pname = multiprocessing.current_process().name
out.write(header)
for match in iter(writer_queue.get, "STOP"):
print("%s is writing %s") % (pname, str(match))
line = str(';'.join(match) + '\n')
out.write(line)
out.close()
Questa è la funzione operaio che fa i calcoli effettivi (spogliato fuori la maggior parte del codice dal momento che doesn' t fare la differenza qui, fonte completa sul repository):
def score_it(job_queue, writer_queue):
'''Calculate scores for pair of words.'''
pname = multiprocessing.current_process().name
for pair in iter(job_queue.get_nowait, "STOP"):
# do all the calculations and put the result into the writer queue
writer_queue.put(result)
Ecco come ho impostato i processi:
# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()
workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []
print('Start matching with "%s", minimum score of %s and %s workers') % (
args.algorithm, minscore, workers)
# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()
print("Job queue size: %s") % job_queue.qsize()
# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()
for w in xrange(workers):
p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
p.start()
processes.append(p)
job_queue.put("STOP")
for p in processes:
p.join()
writer_queue.put("STOP")
ho letto un po 'qui circa multiprocessing essere più lento a volte e so che questo ha a che fare con il sovraccarico di creare e gestire nuovi processi.Inoltre, quando il lavoro da svolgere non è abbastanza "grande", l'effetto del multiprocessing potrebbe non essere visibile. Comunque in questo caso penso che il lavoro sia abbastanza grande e anche la versione Pool.map() sembra dimostrarlo perché è molto più veloce.
Sto facendo qualcosa di veramente sbagliato quando si gestiscono tutti questi processi e si passano oggetti in coda? Come può essere ottimizzato in modo che i risultati possano essere scritti su un file mentre vengono elaborati al fine di ridurre al minimo la quantità di memoria richiesta durante l'esecuzione?
Grazie!
Non so cosa sta succedendo con le prestazioni del tuo sistema basato su code (non ho ancora cercato troppo duro), ma per risolvere il problema di consumo della memoria della tua versione basata su 'pool, potresti provare a usare' pool.imap' per ottenere un iteratore che restituisce i valori dei risultati poiché vengono calcolati dai processi di lavoro. Nel codice 'map', basta scambiare' imap' in per 'map' e spostare le chiamate' pool.close' e 'pool.join' sotto il ciclo che scrive i risultati e si può essere impostato! – Blckknght
Grazie mille per il suggerimento, la soluzione risolve il problema della memoria. Con imap ogni processo richiede meno di 100 mb e con la mappa sale a 1 GB. Comunque diventa più lento - sui miei dati di prova ottengo 34 secondi con imap (vs 13 secondi con mappa). Qualche idea sul perché potrebbe essere? – bergonzzi
Potrebbe avere a che fare con il parametro 'chunksize'. Se capisco le cose correttamente, 'map' usa blocchi più grandi per impostazione predefinita rispetto a' imap', il che può ridurre notevolmente il sovraccarico se la sequenza da mappare è molto lunga. – Blckknght