2014-10-03 2 views
30

ho usato rosetta.parallel.pandas_easy per parallelizzare applicare dopo il gruppo, ad esempio:parallelizzare applicare dopo panda groupby

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame 
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) 
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index) 

Tuttavia, qualcuno ha capito come parallelizzare una funzione che restituisce un dataframe? Questo codice non funziona per rosetta, come previsto.

def tmpFunc(df): 
    df['c'] = df.a + df.b 
    return df 

df.groupby(df.index).apply(tmpFunc) 
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index) 

risposta

55

Questo sembra funzionare, anche se in realtà dovrebbe essere costruito per PANDAS

import pandas as pd 
from joblib import Parallel, delayed 
import multiprocessing 

def tmpFunc(df): 
    df['c'] = df.a + df.b 
    return df 

def applyParallel(dfGrouped, func): 
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped) 
    return pd.concat(retLst) 

if __name__ == '__main__': 
    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) 
    print 'parallel version: ' 
    print applyParallel(df.groupby(df.index), tmpFunc) 

    print 'regular version: ' 
    print df.groupby(df.index).apply(tmpFunc) 

    print 'ideal version (does not work): ' 
    print df.groupby(df.index).applyParallel(tmpFunc) 
+0

Sai se ci sono stati progressi nell'incorporare la parallelizzazione in panda? – NumenorForLife

+1

Facendo piccola modifica alla funzione che può essere fatto per restituire l'indice gerarchico che i normali rendimenti applicare: 'def temp_func (func, nome, gruppo): ritorno func (gruppo), nome def applyParallel (dfGrouped , func): retLst, top_index = zip (* Parallel (n_jobs = multiprocessing.cpu_count()) (ritardato (temp_func) (func, nome, gruppo) per nome, gruppo in dfGrouped)) return pd.concat (retLst, keys = top_index) ' Dang, non riesco a capire come inserire il codice nei commenti ... – BoZenKhaa

+2

@ jsc123: c'è [dask] (https://github.com/blaze/dask) – paulochf

10

Ho un attacco che uso per ottenere la parallelizzazione in Panda. Rompendo il mio dataframe in blocchi, metto ciascun pezzo nell'elemento di una lista, e poi uso i bit paralleli di ipython per fare un parallelo applicare sull'elenco dei dataframes. Quindi rimetto a posto la lista usando la funzione panda concat.

Questo non è generalmente applicabile, tuttavia. Funziona per me perché la funzione che voglio applicare a ogni blocco del dataframe richiede circa un minuto. E la separazione e la raccolta dei miei dati non ci vuole molto. Quindi questo è chiaramente un kludge. Detto ciò, ecco un esempio. Sto utilizzando ipython notebook così vedrete %%time magia nel mio codice:

## make some example data 
import pandas as pd 

np.random.seed(1) 
n=10000 
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
        'data' : np.random.rand(n)}) 
grouped = df.groupby('mygroup') 

Per questo esempio ho intenzione di fare 'pezzi' in base alla GroupBy sopra, ma questo non deve essere come i dati sono suddivisi in pezzi. Anche se è un modello abbastanza comune.

dflist = [] 
for name, group in grouped: 
    dflist.append(group) 

assegnare i bit in parallelo

from IPython.parallel import Client 
rc = Client() 
lview = rc.load_balanced_view() 
lview.block = True 

scrivere la funzione stupido da applicare ai nostri dati

def myFunc(inDf): 
    inDf['newCol'] = inDf.data ** 10 
    return inDf 

ora corriamo il codice seriale quindi in parallelo. serial prima:

%%time 
serial_list = map(myFunc, dflist) 
CPU times: user 14 s, sys: 19.9 ms, total: 14 s 
Wall time: 14 s 

ora parallelo

%%time 
parallel_list = lview.map(myFunc, dflist) 

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s 
Wall time: 1.56 s 

poi ci vogliono solo pochi ms a unirli di nuovo in uno dataframe

%%time 
combinedDf = pd.concat(parallel_list) 
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms 
Wall time: 300 ms 

Io corro 6 motori ipython sul mio MacBook, ma puoi vederlo trascinare i tempi di esecuzione fino a 2 secondi dai 14 secondi.

Per simulazioni stocastiche a lunga durata, è possibile utilizzare il backend AWS attivando un cluster con StarCluster. La maggior parte delle volte, tuttavia, parallelizzo solo 8 CPU sul mio MBP.

+0

Proverò questo con il mio codice, grazie. Puoi spiegarmi perché applicare non parallelizza automaticamente le operazioni? Sembra che l'intero vantaggio di avere la funzione apply sia evitare il loop, ma se non lo fa con questi gruppi, cosa dà? – robertevansanders

+1

C'è una lunga storia sul fatto che la parallelizzazione sia difficile in Python a causa del GIL. Tieni presente che applicare è solitamente zucchero sintattico e al di sotto sta facendo il ciclo implicito. L'uso della parallelizzazione è alquanto complicato, perché i costi di runtime in parallelo ci sono che a volte annullano i vantaggi della parallelizzazione. –

+0

Esiste una definizione mancante per 'parallel_list' in quanto dà un errore' nome 'parallel_list' non è definito' in questa riga: 'combinedDf = pd.concat (parallel_list)'? – Primer

29

risposta di Ivan è grande, ma sembra che possa essere un po 'semplificata, eliminando anche la necessità di dipendere da joblib:

from multiprocessing import Pool, cpu_count 

def applyParallel(dfGrouped, func): 
    with Pool(cpu_count()) as p: 
     ret_list = p.map(func, [group for name, group in dfGrouped]) 
    return pandas.concat(ret_list) 

proposito: questo non può sostituire qualsiasi groupby.apply(), ma esso copre i casi tipici: esdovrebbe coprire i casi 2 e 3 in the documentation, mentre si dovrebbe ottenere il comportamento del caso 1 dando l'argomento axis=1 alla chiamata finale pandas.concat().

+0

Quando lo eseguo con REPL, ottengo un errore '_pickle.PicklingError: Can not pickle : attributo lookup tmpFunc su __main__ failed' ma come posso farlo con REPL? – Keiku

+0

@Keiku non ho idea, non avevo mai sentito parlare di REPL prima ... ma hai provato con '' func = lambda x: x "? Se anche questo non funziona, ti suggerisco di aprire una domanda specifica. in grado di riprodurre solo con '' applyParallel ([('one', 1), ('two', 2)], your_func) '' –

+0

Grazie per il suggerimento. Sembra che ho provato a riavviare la console e risolto. ti affligge. – Keiku

0

Un breve commento per accompagnare la risposta di JD Long. Ho scoperto che se il numero di gruppi è molto grande (diciamo centinaia di migliaia) e la tua funzione di applicazione sta facendo qualcosa di abbastanza semplice e veloce, allora suddividi il tuo dataframe in blocchi e assegnando ogni pezzo a un lavoratore per eseguire un groupby-apply (in serie) può essere molto più veloce di fare un parallelo groupby-apply e fare in modo che i lavoratori leggano una coda contenente una moltitudine di gruppi. Esempio:

import pandas as pd 
import numpy as np 
import time 
from concurrent.futures import ProcessPoolExecutor, as_completed 

nrows = 15000 
np.random.seed(1980) 
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))}) 

Così il nostro dataframe assomiglia:

a 
0 3425 
1 1016 
2 8141 
3 9263 
4 8018 

Nota quella colonna 'a' ha molti gruppi (si pensi ID cliente):

len(df.a.unique()) 
15000 

Una funzione di operare sul nostro gruppi:

def f1(group): 
    time.sleep(0.0001) 
    return group 

Start ap ool:

ppe = ProcessPoolExecutor(12) 
futures = [] 
results = [] 

fare un parallelo groupby-applicano:

%%time 

for name, group in df.groupby('a'): 
    p = ppe.submit(f1, group) 
    futures.append(p) 

for future in as_completed(futures): 
    r = future.result() 
    results.append(r) 

df_output = pd.concat(results) 
del ppe 

CPU times: user 18.8 s, sys: 2.15 s, total: 21 s 
Wall time: 17.9 s 

Vediamo ora aggiungere una colonna, che suddivide il df in molti meno gruppi:

df['b'] = np.random.randint(0, 12, nrows) 

Ora, invece di 15000 gruppi lì sono solo 12:

len(df.b.unique()) 
12 

Partizioneremo il nostro df e faremo un groupby-apply su ogni blocco.

ppe = ProcessPoolExecutor(12) 

Wrapper divertimento:

def f2(df): 
    df.groupby('a').apply(f1) 
    return df 

inviare ogni blocco da operare in serie:

%%time 

for i in df.b.unique(): 
    p = ppe.submit(f2, df[df.b==i]) 
    futures.append(p) 

for future in as_completed(futures): 
    r = future.result() 
    results.append(r) 

df_output = pd.concat(results) 

CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s 
Wall time: 12.4 s 

Si noti che la quantità di tempo spesa per gruppo non è cambiato. Piuttosto, ciò che è cambiato è la lunghezza della coda da cui i lavoratori leggono. Ho il sospetto che ciò che sta accadendo sia che i lavoratori non possono accedere simultaneamente alla memoria condivisa e stanno tornando costantemente a leggere la coda e stanno quindi calpestando le dita degli altri. Con pezzi più grandi su cui operare, i lavoratori tornano meno frequentemente e quindi questo problema è migliorato e l'esecuzione generale è più veloce.