6

Recentemente ho iniziato a sperimentare con il multiprocessing per velocizzare un'attività. Ho creato uno script che combina le stringhe fuzzy e calcola i punteggi utilizzando diversi algoritmi (volevo confrontare diverse tecniche di corrispondenza). È possibile trovare la fonte completa qui: https://bitbucket.org/bergonzzi/fuzzy-compare/src. Come input ci vogliono 2 file che vengono combinati in coppie (ogni riga di file1 con ogni riga di file2). Per ogni coppia, vengono calcolati i punteggi delle partite fuzzy.Code di multiprocessing in Python più lente di pool.map

Ho realizzato 3 versioni. Correndo con i dati di esempio forniti nel mio repo (che si compone di 697.340 articoli dopo essere stati combinati in coppie), ho le seguenti tempistiche:

  • semplice singolo processo - 0:00:47
  • Multiprocesso con piscina. map() - 0:00:13
  • Multiprocesso utilizzando code (produttore/modello consumer) - 0:01:04

sto cercando di capire perché il mio Pool.map() versione è molto più veloce rispetto alla mia versione di coda, che è in realtà più lenta di quella semplice a processo singolo.

Il mio ragionamento per il tentativo di usare le code è che la versione Pool.map() mantiene i risultati fino a quando tutto è finito e scrive solo su un file alla fine. Ciò significa che per i file di grandi dimensioni finisce per mangiare molta memoria. I'm talking about this version (collegandosi ad esso perché è molto codice da incollare qui).

To solve this I refactored it into a producer/consumer pattern (o tentato almeno). Qui prima produco lavori combinando entrambi i file di input e li inserisco in una coda che i consumatori elaborano (calcola i punteggi delle corrispondenze fuzzy). I lavori finiti vengono messi in coda. Quindi ho un singolo processo che preleva gli articoli fatti da questa coda e li scrive in un file. In teoria, in questo modo, non avrei bisogno di tanta memoria poiché i risultati verrebbero scaricati sul disco. Sembra funzionare bene ma è molto più lento. Ho anche notato che i 4 processi che sto generando non sembrano utilizzare fino al 100% della CPU quando si guarda Activity Monitor su Mac OSX (che non è il caso della versione Pool.map()).

Un'altra cosa che noto è che la mia funzione di produttore sembra riempire correttamente la coda, ma i processi di consumo sembrano attendere fino a quando la coda non viene riempita anziché iniziare a funzionare non appena arriva il primo oggetto. Probabilmente sto facendo qualcosa di sbagliato lì ...

Per riferimento ecco alcuni dei codici rilevanti per la versione della coda (anche se è meglio guardare il codice completo nel repository collegato sopra).

Ecco la mia funzione di produttore:

def combine(list1, list2): 
    ''' 
    Combine every item of list1 with every item of list 2, 
    normalize put the pair in the job queue. 
    ''' 
    pname = multiprocessing.current_process().name 
    for x in list1: 
     for y in list2: 
      # slugify is a function to normalize the strings 
      term1 = slugify(x.strip(), separator=' ') 
      term2 = slugify(y.strip(), separator=' ') 
      job_queue.put_nowait([term1, term2]) 

Questa è la funzione scrittore:

def writer(writer_queue): 
    out = open(file_out, 'wb') 
    pname = multiprocessing.current_process().name 
    out.write(header) 
    for match in iter(writer_queue.get, "STOP"): 
     print("%s is writing %s") % (pname, str(match)) 
     line = str(';'.join(match) + '\n') 
     out.write(line) 
    out.close() 

Questa è la funzione operaio che fa i calcoli effettivi (spogliato fuori la maggior parte del codice dal momento che doesn' t fare la differenza qui, fonte completa sul repository):

def score_it(job_queue, writer_queue): 
    '''Calculate scores for pair of words.''' 
    pname = multiprocessing.current_process().name 

    for pair in iter(job_queue.get_nowait, "STOP"): 
     # do all the calculations and put the result into the writer queue 
     writer_queue.put(result) 

Ecco come ho impostato i processi:

# Files 
to_match = open(args.file_to_match).readlines() 
source_list = open(args.file_to_be_matched).readlines() 

workers = 4 
job_queue = multiprocessing.Manager().Queue() 
writer_queue = multiprocessing.Manager().Queue() 
processes = [] 

print('Start matching with "%s", minimum score of %s and %s workers') % (
    args.algorithm, minscore, workers) 

# Fill up job queue 
print("Filling up job queue with term pairs...") 
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list)) 
c.start() 
c.join() 

print("Job queue size: %s") % job_queue.qsize() 

# Start writer process 
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,)) 
w.start() 

for w in xrange(workers): 
    p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue)) 
    p.start() 
    processes.append(p) 
    job_queue.put("STOP") 

for p in processes: 
    p.join() 

writer_queue.put("STOP") 

ho letto un po 'qui circa multiprocessing essere più lento a volte e so che questo ha a che fare con il sovraccarico di creare e gestire nuovi processi.Inoltre, quando il lavoro da svolgere non è abbastanza "grande", l'effetto del multiprocessing potrebbe non essere visibile. Comunque in questo caso penso che il lavoro sia abbastanza grande e anche la versione Pool.map() sembra dimostrarlo perché è molto più veloce.

Sto facendo qualcosa di veramente sbagliato quando si gestiscono tutti questi processi e si passano oggetti in coda? Come può essere ottimizzato in modo che i risultati possano essere scritti su un file mentre vengono elaborati al fine di ridurre al minimo la quantità di memoria richiesta durante l'esecuzione?

Grazie!

+1

Non so cosa sta succedendo con le prestazioni del tuo sistema basato su code (non ho ancora cercato troppo duro), ma per risolvere il problema di consumo della memoria della tua versione basata su 'pool, potresti provare a usare' pool.imap' per ottenere un iteratore che restituisce i valori dei risultati poiché vengono calcolati dai processi di lavoro. Nel codice 'map', basta scambiare' imap' in per 'map' e spostare le chiamate' pool.close' e ​​'pool.join' sotto il ciclo che scrive i risultati e si può essere impostato! – Blckknght

+0

Grazie mille per il suggerimento, la soluzione risolve il problema della memoria. Con imap ogni processo richiede meno di 100 mb e con la mappa sale a 1 GB. Comunque diventa più lento - sui miei dati di prova ottengo 34 secondi con imap (vs 13 secondi con mappa). Qualche idea sul perché potrebbe essere? – bergonzzi

+0

Potrebbe avere a che fare con il parametro 'chunksize'. Se capisco le cose correttamente, 'map' usa blocchi più grandi per impostazione predefinita rispetto a' imap', il che può ridurre notevolmente il sovraccarico se la sequenza da mappare è molto lunga. – Blckknght

risposta

2

Penso che il problema con i tempi è la tua versione con coda multithreaded manca un'ottimizzazione. Hai fatto un commento essenzialmente dicendo che il tuo job_queue si riempie prima che i thread di lavoro inizino a ricevere i lavori da esso. Credo che la ragione di ciò sia il c.join() che hai nella coda #Fill up job. Ciò impedisce al thread principale di continuare fino a quando la coda dei lavori è piena. Sposterei il c.join() fino alla fine dopo il p.join(). Dovrai anche trovare un modo per far arrivare i tuoi stop flag alla fine della coda. La funzione di combinazione potrebbe essere un buon posto per metterlo. Qualcosa sulla falsariga di aggiungere il numero x di stop flag dopo che è stato esaurito i dati da combinare.

Un'altra cosa da notare: Stai scrivendo su di te w variabile nell'ambito del tuo ciclo for che dà il via ai processi p. Per questioni di stile/leggibilità/etc, cambierei w con un nome di variabile diverso. Se non lo stai usando, un carattere di sottolineatura funziona come un buon nome di variabile usa e getta. I.e.

for w in xrange(workers): 

dovrebbe diventare

for _ in xrange(workers): 

farla breve, se si sposta il c.join() fino alla fine, si dovrebbe ottenere tempi più accurati. Attualmente, l'unica cosa che è multithread è la corrispondenza fuzzy delle stringhe. Uno dei vantaggi di avere un thread produttore/consumatore è che i thread del consumer non devono aspettare fino a quando il thread del produttore non è finito e, quindi, si finisce per usare meno memoria.

+0

Grazie per il suggerimento! Ha senso unirsi al processo "Job Feeder" alla fine ... Penso di essere ancora confuso su cosa sia effettivamente join(), è chiaro ora per me. Quello che ancora non capisco molto bene è il tuo suggerimento "Dovrai anche trovare un modo per far finire i tuoi stop flag alla fine della coda.La funzione di combinazione potrebbe essere un buon posto per mettere questo. linee di aggiunta del numero x di stop flag dopo che è esaurito i dati da combinare. " - cosa intendi esattamente? Perché x numero di stop flag? E come posso rilevare che non ho più dati da combinare? – bergonzzi

+0

Dato che i tuoi dati provengono da file, quando i file si esauriscono, sai che non hai più dati da combinare. I thread dei consumatori avranno bisogno di una sorta di segnale per sapere quando spegnere, quindi l'implementazione dei flag "stop". Ogni volta che un thread utente estrae un lavoro dalla coda ed è un "STOP" o un None o qualcosa di simile (dovrai scegliere cosa è meglio per la tua applicazione) il thread sa che può spegnersi. Come ho fatto su un'applicazione simile è il thread del mio produttore, una volta terminato di creare lavori, ha prodotto stop flag pari al numero di thread di consumo. –