2016-07-14 125 views
5

Ho i dati nella dataframe come di seguito:come ottenere il massimo (data) dal set dato di dati raggruppati da alcuni campi usando pyspark?

datetime    | userId | memberId | value |  
2016-04-06 16:36:... | 1234 | 111  | 1 
2016-04-06 17:35:... | 1234 | 222  | 5 
2016-04-06 17:50:... | 1234 | 111  | 8 
2016-04-06 18:36:... | 1234 | 222  | 9 
2016-04-05 16:36:... | 4567 | 111  | 1 
2016-04-06 17:35:... | 4567 | 222  | 5 
2016-04-06 18:50:... | 4567 | 111  | 8 
2016-04-06 19:36:... | 4567 | 222  | 9 

ho bisogno di trovare il massimo (datetime) groupby userid, memberId. Quando ho provato, come di seguito:

df2 = df.groupBy('userId','memberId').max('datetime') 

sto ottenendo errore come:

org.apache.spark.sql.AnalysisException: "datetime" is not a numeric 
column. Aggregation function can only be applied on a numeric column.; 

L'uscita ho desiderato è la seguente:

userId | memberId | datetime 
1234 | 111  | 2016-04-06 17:50:... 
1234 | 222  | 2016-04-06 18:36:... 
4567 | 111  | 2016-04-06 18:50:... 
4567 | 222  | 2016-04-06 19:36:... 

Per favore qualcuno può aiutarmi come ottenere la data massima tra i dati dati usando i datafram PySpark?

risposta

7

Per non numerico ma Orderable tipi è possibile utilizzare agg con max direttamente:

from pyspark.sql.functions import col, max as max_ 

df = sc.parallelize([ 
    ("2016-04-06 16:36", 1234, 111, 1), 
    ("2016-04-06 17:35", 1234, 111, 5), 
]).toDF(["datetime", "userId", "memberId", "value"]) 

(df.withColumn("datetime", col("datetime").cast("timestamp")) 
    .groupBy("userId", "memberId") 
    .agg(max_("datetime"))) 

## +------+--------+--------------------+ 
## |userId|memberId|  max(datetime)| 
## +------+--------+--------------------+ 
## | 1234|  111|2016-04-06 17:35:...| 
## +------+--------+--------------------+ 
+0

grazie questo mi ha aiutato. Ma, per favore, puoi chiarire se dobbiamo parallelizzare come hai fatto nel primo passaggio. Ho già un dataframe nel formato indicato come nella Q. Scusate se la mia query è di base perché sono un principiante da far scoppiare. – cool716

+0

Non è necessario parallelizzare. Creo il dataframe qui solo per fornire un esempio riproducibile. – zero323

+0

Grazie. Inoltre, se ho 2 colonne di date, come posso ottenere il massimo di ciascuna con le stesse colonne di raggruppamento? Ecco il df: df = sc.parallelize ([ ("01-0435 16:36", "2016-04-05 16:36", 1234, 111, 1), ("2016-04- 06 17:35 "," 2016-04-08 17:35 ", 1234, 111, 5), ]). ToDF ([" datetime1 "," datetime2 "," userId "," memberId "," valore " ]) – cool716