Un breve commento per accompagnare la risposta di JD Long. Ho scoperto che se il numero di gruppi è molto grande (diciamo centinaia di migliaia) e la tua funzione di applicazione sta facendo qualcosa di abbastanza semplice e veloce, allora suddividi il tuo dataframe in blocchi e assegnando ogni pezzo a un lavoratore per eseguire un groupby-apply (in serie) può essere molto più veloce di fare un parallelo groupby-apply e fare in modo che i lavoratori leggano una coda contenente una moltitudine di gruppi. Esempio:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
Così il nostro dataframe assomiglia:
a
0 3425
1 1016
2 8141
3 9263
4 8018
Nota quella colonna 'a' ha molti gruppi (si pensi ID cliente):
len(df.a.unique())
15000
Una funzione di operare sul nostro gruppi:
def f1(group):
time.sleep(0.0001)
return group
Start ap ool:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
fare un parallelo groupby-applicano:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Vediamo ora aggiungere una colonna, che suddivide il df in molti meno gruppi:
df['b'] = np.random.randint(0, 12, nrows)
Ora, invece di 15000 gruppi lì sono solo 12:
len(df.b.unique())
12
Partizioneremo il nostro df e faremo un groupby-apply su ogni blocco.
ppe = ProcessPoolExecutor(12)
Wrapper divertimento:
def f2(df):
df.groupby('a').apply(f1)
return df
inviare ogni blocco da operare in serie:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Si noti che la quantità di tempo spesa per gruppo non è cambiato. Piuttosto, ciò che è cambiato è la lunghezza della coda da cui i lavoratori leggono. Ho il sospetto che ciò che sta accadendo sia che i lavoratori non possono accedere simultaneamente alla memoria condivisa e stanno tornando costantemente a leggere la coda e stanno quindi calpestando le dita degli altri. Con pezzi più grandi su cui operare, i lavoratori tornano meno frequentemente e quindi questo problema è migliorato e l'esecuzione generale è più veloce.
Sai se ci sono stati progressi nell'incorporare la parallelizzazione in panda? – NumenorForLife
Facendo piccola modifica alla funzione che può essere fatto per restituire l'indice gerarchico che i normali rendimenti applicare: 'def temp_func (func, nome, gruppo): ritorno func (gruppo), nome def applyParallel (dfGrouped , func): retLst, top_index = zip (* Parallel (n_jobs = multiprocessing.cpu_count()) (ritardato (temp_func) (func, nome, gruppo) per nome, gruppo in dfGrouped)) return pd.concat (retLst, keys = top_index) ' Dang, non riesco a capire come inserire il codice nei commenti ... – BoZenKhaa
@ jsc123: c'è [dask] (https://github.com/blaze/dask) – paulochf