2015-05-01 3 views
25

sto analizzando alcuni dati con dataframes pyspark, supponiamo di avere un dataframe df che sto aggregazione:colonne ridenominazione per dataframes pyspark aggregati

df.groupBy("group")\ 
    .agg({"money":"sum"})\ 
    .show(100) 

Questo mi darà:

group    SUM(money#2L) 
A     137461285853 
B     172185566943 
C     271179590646 

L'aggregazione funziona bene ma non mi piace il nome della nuova colonna "SUM (money # 2L)". C'è un modo pulito per rinominare questa colonna in qualcosa di leggibile dal metodo .agg? Forse qualcosa di più simile a quello che si potrebbe fare in dplyr:

df %>% group_by(group) %>% summarise(sum_money = sum(money)) 

risposta

43

Anche se io preferisco ancora dplyr sintassi, questo frammento di codice farà:

import pyspark.sql.functions as sf 

df.groupBy("group")\ 
    .agg(sf.sum('money').alias('money'))\ 
    .show(100) 

Diventa verbose.

25

withColumnRenamed dovrebbe fare il trucco. Ecco il link allo pyspark.sql API.

df.groupBy("group")\ 
    .agg({"money":"sum"})\ 
    .withColumnRenamed("SUM(money)", "money") 
    .show(100) 
3

Ho fatto una piccola funzione di supporto per questo che potrebbe aiutare alcune persone.

import re 

from functools import partial 

def rename_cols(agg_df, ignore_first_n=1): 
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe 
    and the number of aggregation columns to ignore. 
    """ 
    delimiters = "(", ")" 
    split_pattern = '|'.join(map(re.escape, delimiters)) 
    splitter = partial(re.split, split_pattern) 
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n] 
    renamed = map(split_agg, agg_df.columns[ignore_first_n:]) 
    renamed = zip(agg_df.columns[ignore_first_n:], renamed) 
    for old, new in renamed: 
     agg_df = agg_df.withColumnRenamed(old, new) 
    return agg_df 

Un esempio:

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks") 
.groupby("id") 
.agg({"rank": "mean", 
     "*": "count", 
     "rate": "mean", 
     "price": "mean", 
     "clicks": "mean", 
     }) 
) 

>>> gb.columns 
['id', 
'avg(rate)', 
'count(1)', 
'avg(price)', 
'avg(rank)', 
'avg(clicks)'] 

>>> rename_cols(gb).columns 
['id', 
'avg_rate', 
'count_1', 
'avg_price', 
'avg_rank', 
'avg_clicks'] 

fare almeno un po 'per salvare la gente di digitare così tanto.

+0

Molto utile e attuale. Stavo per fare la stessa domanda. Sarebbe bello se potessi specificare un nuovo nome di colonna all'interno del dict 'agg' (all'interno di Spark intendo). –

+0

@EvanZamir ringrazia! Potrei provare a fare un semplice PR in scintilla per quello. –