2016-02-06 19 views
6

Ho alcuni file csv piuttosto grandi (~ 10 gb) e vorrei sfruttare Dask per l'analisi. Tuttavia, a seconda del numero di partizioni con cui ho impostato l'oggetto dask con cui leggere, i risultati del mio gruppo cambiano. La mia comprensione è stata che dask ha approfittato delle partizioni per i vantaggi dell'elaborazione out-of-core, ma che avrebbe comunque restituito l'output di groupby appropriato. Questo non sembra essere il caso e sto lottando per capire quali sono le impostazioni alternative necessarie. Di seguito è riportato un piccolo esempio:Dask DataFrame Groupby Partitions

df = pd.DataFrame({'A': np.arange(100), 'B': np.random.randn(100), 'C': np.random.randn(100), 'Grp1': np.repeat([1, 2], 50), 'Grp2': [3, 4, 5, 6], 25)}) 

test_dd1 = dd.from_pandas(df, npartitions=1) 
test_dd2 = dd.from_pandas(df, npartitions=2) 
test_dd5 = dd.from_pandas(df, npartitions=5) 
test_dd10 = dd.from_pandas(df, npartitions=10) 
test_dd100 = dd.from_pandas(df, npartitions=100) 

def test_func(x): 
    x['New_Col'] = len(x[x['B'] > 0.])/len(x['B']) 
    return x 

test_dd1.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head() 
    A    B    C Grp1 Grp2 New_Col 
0 0 -0.561376 -1.422286  1  3  0.48 
1 1 -1.107799 1.075471  1  3  0.48 
2 2 -0.719420 -0.574381  1  3  0.48 
3 3 -1.287547 -0.749218  1  3  0.48 
4 4 0.677617 -0.908667  1  3  0.48 

test_dd2.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head() 
    A    B    C Grp1 Grp2 New_Col 
0 0 -0.561376 -1.422286  1  3  0.48 
1 1 -1.107799 1.075471  1  3  0.48 
2 2 -0.719420 -0.574381  1  3  0.48 
3 3 -1.287547 -0.749218  1  3  0.48 
4 4 0.677617 -0.908667  1  3  0.48 

test_dd5.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head() 
    A    B    C Grp1 Grp2 New_Col 
0 0 -0.561376 -1.422286  1  3  0.45 
1 1 -1.107799 1.075471  1  3  0.45 
2 2 -0.719420 -0.574381  1  3  0.45 
3 3 -1.287547 -0.749218  1  3  0.45 
4 4 0.677617 -0.908667  1  3  0.45 

test_dd10.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head() 
    A    B    C Grp1 Grp2 New_Col 
0 0 -0.561376 -1.422286  1  3  0.5 
1 1 -1.107799 1.075471  1  3  0.5 
2 2 -0.719420 -0.574381  1  3  0.5 
3 3 -1.287547 -0.749218  1  3  0.5 
4 4 0.677617 -0.908667  1  3  0.5 

test_dd100.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head() 
    A    B    C Grp1 Grp2 New_Col 
0 0 -0.561376 -1.422286  1  3  0 
1 1 -1.107799 1.075471  1  3  0 
2 2 -0.719420 -0.574381  1  3  0 
3 3 -1.287547 -0.749218  1  3  0 
4 4 0.677617 -0.908667  1  3  1 

df.groupby(['Grp1', 'Grp2']).apply(test_func).head() 
    A    B    C Grp1 Grp2 New_Col 
0 0 -0.561376 -1.422286  1  3  0.48 
1 1 -1.107799 1.075471  1  3  0.48 
2 2 -0.719420 -0.574381  1  3  0.48 
3 3 -1.287547 -0.749218  1  3  0.48 
4 4 0.677617 -0.908667  1  3  0.48 

fa il passaggio groupby operano solo all'interno di ogni partizione, piuttosto che guardando oltre la piena dataframe? In questo caso è banale impostare npartitions = 1 e non sembra influire molto sulle prestazioni, ma poiché read_csv imposta automaticamente un certo numero di partizioni come si configura la chiamata per garantire che i risultati di groupby siano accurati?

Grazie!

+0

Il mio primo pensiero è che groupby/apply di dask potrebbe non garantire l'ordine dei risultati, ma potrebbero essere tutti comunque presenti. – shoyer

+0

Sì, stavo pensando anche a questo, ma ho fatto diverse sezioni univoche e i risultati all'interno del gruppo finiscono con l'aumentare del conteggio delle partizioni. In un singolo set di "grp1/grp2" unici, ad esempio, ci sarebbero 2 valori diversi. – Bhage

+0

Qualche risoluzione a questo problema? – codingknob

risposta

2

Sono sorpreso da questo risultato. Groupby.apply dovrebbe restituire gli stessi risultati indipendentemente dal numero di partizioni. Se riesci a fornire un esempio riproducibile, ti incoraggio a raise an issue e uno degli sviluppatori darà un'occhiata.