2016-04-12 33 views
5

Supponiamo di avere un dataframe panda e una funzione che vorrei applicare a ciascuna riga. Posso chiamare df.apply(apply_fn, axis=1), che dovrebbe richiedere un tempo lineare nella dimensione di df. Oppure posso dividere df e usare pool.map per chiamare la mia funzione su ogni pezzo e quindi concatenare i risultati.Perché l'utilizzo del multiprocessing con i panda si ripercuote su una velocità così drammatica?

Mi aspettavo che il fattore di accelerazione dall'uso di pool.map fosse all'incirca uguale al numero di processi nel pool (new_execution_time = original_execution_time/N se si utilizzano processori N - e questo presuppone zero overhead).

Invece, in questo esempio di giocattolo, il tempo scende a circa il 2% (0,005272/0,230757) quando si utilizzano 4 processori. Mi aspettavo al massimo il 25%. Cosa sta succedendo e cosa non capisco?

import numpy as np 
from multiprocessing import Pool 
import pandas as pd 
import pdb 
import time 

n = 1000 
variables = {"hello":np.arange(n), "there":np.random.randn(n)} 
df = pd.DataFrame(variables) 

def apply_fn(series): 
    return pd.Series({"col_5":5, "col_88":88, 
         "sum_hello_there":series["hello"] + series["there"]}) 

def call_apply_fn(df): 
    return df.apply(apply_fn, axis=1) 

n_processes = 4 # My machine has 4 CPUs 
pool = Pool(processes=n_processes) 

t0 = time.process_time() 
new_df = df.apply(apply_fn, axis=1) 
t1 = time.process_time() 
df_split = np.array_split(df, n_processes) 
pool_results = pool.map(call_apply_fn, df_split) 
new_df2 = pd.concat(pool_results) 
t2 = time.process_time() 
new_df3 = df.apply(apply_fn, axis=1) # Try df.apply a second time 
t3 = time.process_time() 

print("identical results: %s" % np.all(np.isclose(new_df, new_df2))) # True 
print("t1 - t0 = %f" % (t1 - t0)) # I got 0.230757 
print("t2 - t1 = %f" % (t2 - t1)) # I got 0.005272 
print("t3 - t2 = %f" % (t3 - t2)) # I got 0.229413 

Ho salvato il codice sopra e lo ho eseguito utilizzando python3 my_filename.py.

PS Mi rendo conto che in questo esempio di giocattolo new_df può essere creato in un modo molto più semplice, senza utilizzare apply. Sono interessato ad applicare codice simile con uno più complesso apply_fn che non aggiunge solo colonne.

risposta

1

Modifica (La mia risposta precedente era in realtà sbagliata.)

time.process_time() (doc) misura il tempo solo nel processo in corso (e non comprende il tempo del sonno). Quindi il tempo trascorso nei processi figli non viene preso in considerazione.

Corro il codice con time.time(), che misura l'ora del mondo reale (senza alcuna accelerazione) e con uno più affidabile timeit.timeit (circa il 50% di accelerazione). Ho 4 core.

+0

Grazie, devi aver ragione. Però non capisco cosa stia succedendo. Perché ciò accade anche se le chiamate 'time.process_time()' sono al di fuori delle chiamate multiprocessing? È perché 'time.process_time()' sta restituendo solo la CPU del processo genitore? – Adrian

+0

@Adrian Scusate, mi sbagliavo - i processi figli non hanno avuto problemi con 'process_time'. Dispiace per la confusione. Aggiorno la risposta – ptrj