2014-11-06 7 views
12

Sto provando a utilizzare la multiprocessing con pandas dataframe, ovvero il dataframe diviso in 8 parti. applica alcune funzioni a ciascuna parte usando apply (con ogni parte elaborata in processi diversi).si applica il multiprocessing panda

EDIT: Ecco la soluzione che ho finalmente trovato:

import multiprocessing as mp 
import pandas.util.testing as pdt 

def process_apply(x): 
    # do some stuff to data here 

def process(df): 
    res = df.apply(process_apply, axis=1) 
    return res 

if __name__ == '__main__': 
    p = mp.Pool(processes=8) 
    split_dfs = np.array_split(big_df,8) 
    pool_results = p.map(aoi_proc, split_dfs) 
    p.close() 
    p.join() 

    # merging parts processed by different processes 
    parts = pd.concat(pool_results, axis=0) 

    # merging newly calculated parts to big_df 
    big_df = pd.concat([big_df, parts], axis=1) 

    # checking if the dfs were merged correctly 
    pdt.assert_series_equal(parts['id'], big_df['id']) 
+0

c'è uno spazio in 'res = df.apply (processo applicato, asse = 1)', giusto? –

+1

@yemu cosa stai cercando esattamente di ottenere con questo codice? – Dalek

+0

attualmente applicato solo satura un core della CPU. Voglio utilizzare il multiprocesso e utilizzare tutti i core per ridurre i tempi di elaborazione – yemu

risposta

3

Dal momento che non ho molto dello script di dati, questo è una supposizione, ma io suggerirei usando p.map invece che con l'apply_async richiama.

p = mp.Pool(8) 
pool_results = p.map(process, np.array_split(big_df,8)) 
p.close() 
p.join() 
results = [] 
for result in pool_results: 
    results.extend(result) 
+0

@yemu ha fatto questo lavoro per te? –

+0

Ho dovuto mettere la chiamata dentro se __name__ == '__main__'. e con altre piccole modifiche sono riuscito a farlo funzionare, tuttavia non sono sicuro che i dati dei risultati nei risultati del pool vengano restituiti nello stesso ordine in cui sono stati suddivisi. Devo controllarlo – yemu

+0

vedere qui per una soluzione con 'dask' https://stackoverflow.com/questions/37979167/how-to-parallelize-many-fuzzy-string-comparisons-using-apply-in-pandas –

0

Ho anche eseguito nello stesso problema quando uso multiprocessing.map() applicare funzione differente pezzo di un grande dataframe.

Voglio solo aggiungere diversi punti nel caso in cui altre persone incontrano il mio stesso problema.

  1. ricordarsi di aggiungere if __name__ == '__main__':
  2. eseguire il file in un file .py, se si utilizza ipython/jupyter notebook, allora non si può eseguire multiprocessing (questo è vero per il mio caso, anche se non ho idea)