2015-11-05 7 views
5

Quindi, quello che sto cercando di fare con il seguente codice è leggere un elenco di liste e metterle attraverso la funzione e poi avere il rapporto log_result con il risultato della funzione checker. Sto cercando di farlo utilizzando il multithreading perché il nome della variabile rows_to_parse in realtà ha milioni di righe, quindi l'utilizzo di più core dovrebbe accelerare questo processo di una quantità considerevole.Elaborazione multiprocesso su pda dataframe

Il codice in questo momento non funziona e si blocca in Python.

preoccupazioni e problemi che ho:

  1. desidera che il df esistente che ha tenuto nella variabile df per mantenere l'indice per tutto il processo, perché altrimenti log_result otterrà confusi su quale riga deve essere aggiornato.
  2. Sono abbastanza sicuro che apply_async non è la funzione di elaborazione multipla appropriata per eseguire questo compito perché credo che l'ordine a cui il computer legge e scrive il df può eventualmente corromperlo ???
  3. Penso che potrebbe essere necessario impostare una coda per scrivere e leggere df ma non sono sicuro di come farei per farlo.

Grazie per l'assistenza.

import pandas as pd 
import multiprocessing 
from functools import partial 

def checker(a,b,c,d,e): 
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)] 
    index_of_match = match.index.tolist() 
    if len(index_of_match) == 1: #one match in df 
     return index_of_match 
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__: 
     return [index_of_match[0]] 
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df 
     return [a,b,c,d,e] 



def log_result(result, dataf): 
    if len(result) == 1: # 
     dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df 
     new_row = pd.DataFrame([result],columns=cols) 
     dataf = dataf.append(new_row,ignore_index=True) 


def apply_async_with_callback(parsing_material, dfr): 
    pool = multiprocessing.Pool() 
    for var_a, var_b, var_c, var_d, var_e in parsing_material: 
     pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr)) 
    pool.close() 
    pool.join() 



if __name__ == '__main__': 
    #setting up main dataframe 
    cols = ['a','b','c','d','e'] 
    existing_data = [["YES","A","16052011","13031999",3], 
        ["NO","Q","11022003","15081999",3], 
        ["YES","A","22082010","03012001",9]] 

    #main dataframe 
    df = pd.DataFrame(existing_data,columns=cols) 

    #new data 
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5], 
        ['YES', 'W', '17061992', '26032012', 6], 
        ['YES', 'G', '01122006', '07082014', 2], 
        ['YES', 'N', '06081992', '21052008', 9], 
        ['YES', 'Y', '18051995', '24011996', 6], 
        ['NO', 'Q', '11022003', '15081999', 3], 
        ['NO', 'O', '20112004', '28062008', 0], 
        ['YES', 'R', '10071994', '03091996', 8], 
        ['NO', 'C', '09091998', '22051992', 1], 
        ['YES', 'Q', '01051995', '02012000', 3], 
        ['YES', 'Q', '26022015', '26092007', 5], 
        ['NO', 'F', '15072002', '17062001', 8], 
        ['YES', 'I', '24092006', '03112003', 2], 
        ['YES', 'A', '22082010', '03012001', 9], 
        ['YES', 'I', '15072016', '30092005', 7], 
        ['YES', 'Y', '08111999', '02022006', 3], 
        ['NO', 'V', '04012016', '10061996', 1], 
        ['NO', 'I', '21012003', '11022001', 6], 
        ['NO', 'P', '06041992', '30111993', 6], 
        ['NO', 'W', '30081992', '02012016', 6]] 


    apply_async_with_callback(rows_to_parse, df) 
+0

Che altro è: #no corrisponde, fornisci argomenti per scrivere a df che si suppone stia facendo? Penso che se restituisci [a, b, c, d, e] il tuo codice verrà effettivamente completato ma avrai altri problemi, non utilizzerai mai dataf ovunque –

+0

grazie per averlo indicato, ho modificato il codice. quindi '[a, b, c, d, e]' viene scritto nel df nella funzione 'log_result'. – user3374113

+0

'partial (log_result, dataf = dfr)' non corrisponde alla firma di 'log_results' – mdurant

risposta

8

Aggiornamento DataFrames come questo in multiprocessing non è andare a lavorare:

dataf = dataf.append(new_row,ignore_index=True) 

Per prima cosa questo è molto inefficiente (O (n) per ogni accoda quindi O (n^2) in Il modo preferito è concatenare alcuni oggetti insieme in un unico passaggio

Per un altro, e ancora più importante, dataf non sta bloccando per ogni aggiornamento, quindi non c'è garanzia che due operazioni non siano in conflitto (suppongo che questo è crash python)

Infine, append non agisce sul posto, quindi la variabile dataf viene scartata una volta terminata la richiamata !! e non vengono apportate modifiche al genitore dataf.


Potremmo usare MultiProcessing list o un dict. elenca se non ti interessa l'ordine o la dicitura se lo fai (ad esempio enumerare), poiché devi notare che i valori non vengono restituiti in un ordine ben definito da asincrono.
(o potremmo creare un oggetto che implementa chiuderci, vedere Eli Bendersky.)
Così sono apportate le seguenti modifiche:

df = pd.DataFrame(existing_data,columns=cols) 
# becomes 
df = pd.DataFrame(existing_data,columns=cols) 
d = MultiProcessing.list([df]) 

dataf = dataf.append(new_row,ignore_index=True) 
# becomes 
d.append(new_row) 

Ora, una volta che l'async ha finito di avere un MultiProcessing.list di DataFrames. È possibile concat questi (e ignore_index) per ottenere il risultato desiderato:

pd.concat(d, ignore_index=True) 

dovrebbe fare il trucco.


Nota: la creazione del dataframe newRow in ogni fase è anche meno efficiente che lasciando i panda analizzare l'elenco delle liste direttamente ad un dataframe in un colpo solo. Speriamo che questo sia un esempio di giocattolo, in realtà vuoi che i tuoi pezzi siano abbastanza grandi per ottenere vittorie con MultiProcessing (ho sentito 50kb come regola empirica ...), una riga alla volta non sarà mai una vinci qui


parte: Si dovrebbe evitare l'uso di variabili globali (come df) nel codice, è molto più pulito di passarli in giro per le funzioni (in questo caso, come argomento a checker).