Quindi, quello che sto cercando di fare con il seguente codice è leggere un elenco di liste e metterle attraverso la funzione e poi avere il rapporto log_result
con il risultato della funzione checker
. Sto cercando di farlo utilizzando il multithreading perché il nome della variabile rows_to_parse
in realtà ha milioni di righe, quindi l'utilizzo di più core dovrebbe accelerare questo processo di una quantità considerevole.Elaborazione multiprocesso su pda dataframe
Il codice in questo momento non funziona e si blocca in Python.
preoccupazioni e problemi che ho:
- desidera che il df esistente che ha tenuto nella variabile
df
per mantenere l'indice per tutto il processo, perché altrimentilog_result
otterrà confusi su quale riga deve essere aggiornato. - Sono abbastanza sicuro che
apply_async
non è la funzione di elaborazione multipla appropriata per eseguire questo compito perché credo che l'ordine a cui il computer legge e scrive il df può eventualmente corromperlo ??? - Penso che potrebbe essere necessario impostare una coda per scrivere e leggere
df
ma non sono sicuro di come farei per farlo.
Grazie per l'assistenza.
import pandas as pd
import multiprocessing
from functools import partial
def checker(a,b,c,d,e):
match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
index_of_match = match.index.tolist()
if len(index_of_match) == 1: #one match in df
return index_of_match
elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
return [index_of_match[0]]
else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
return [a,b,c,d,e]
def log_result(result, dataf):
if len(result) == 1: #
dataf.loc[result[0]]['e'] += 1
else: #append new row to exisiting df
new_row = pd.DataFrame([result],columns=cols)
dataf = dataf.append(new_row,ignore_index=True)
def apply_async_with_callback(parsing_material, dfr):
pool = multiprocessing.Pool()
for var_a, var_b, var_c, var_d, var_e in parsing_material:
pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
pool.close()
pool.join()
if __name__ == '__main__':
#setting up main dataframe
cols = ['a','b','c','d','e']
existing_data = [["YES","A","16052011","13031999",3],
["NO","Q","11022003","15081999",3],
["YES","A","22082010","03012001",9]]
#main dataframe
df = pd.DataFrame(existing_data,columns=cols)
#new data
rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
['YES', 'W', '17061992', '26032012', 6],
['YES', 'G', '01122006', '07082014', 2],
['YES', 'N', '06081992', '21052008', 9],
['YES', 'Y', '18051995', '24011996', 6],
['NO', 'Q', '11022003', '15081999', 3],
['NO', 'O', '20112004', '28062008', 0],
['YES', 'R', '10071994', '03091996', 8],
['NO', 'C', '09091998', '22051992', 1],
['YES', 'Q', '01051995', '02012000', 3],
['YES', 'Q', '26022015', '26092007', 5],
['NO', 'F', '15072002', '17062001', 8],
['YES', 'I', '24092006', '03112003', 2],
['YES', 'A', '22082010', '03012001', 9],
['YES', 'I', '15072016', '30092005', 7],
['YES', 'Y', '08111999', '02022006', 3],
['NO', 'V', '04012016', '10061996', 1],
['NO', 'I', '21012003', '11022001', 6],
['NO', 'P', '06041992', '30111993', 6],
['NO', 'W', '30081992', '02012016', 6]]
apply_async_with_callback(rows_to_parse, df)
Che altro è: #no corrisponde, fornisci argomenti per scrivere a df che si suppone stia facendo? Penso che se restituisci [a, b, c, d, e] il tuo codice verrà effettivamente completato ma avrai altri problemi, non utilizzerai mai dataf ovunque –
grazie per averlo indicato, ho modificato il codice. quindi '[a, b, c, d, e]' viene scritto nel df nella funzione 'log_result'. – user3374113
'partial (log_result, dataf = dfr)' non corrisponde alla firma di 'log_results' – mdurant