2016-05-11 27 views
5

miei dati è come:SPARK dataframe: Rimuovere il valore MAX in un gruppo

id | val 
---------------- 
a1 | 10 
a1 | 20 
a2 | 5 
a2 | 7 
a2 | 2 

sto cercando di eliminare la riga che ha MAX (val) nel gruppo se il gruppo I a "id".

risultato dovrebbe essere come:

id | val 
---------------- 
a1 | 10 
a2 | 5 
a2 | 2 

Sto usando SPARK dataframe e SqlContext. Mi serve in qualche modo:

DataFrame df = sqlContext.sql("SELECT * FROM jsontable WHERE (id, val) NOT IN (SELECT is,MAX(val) from jsontable GROUP BY id)"); 

Come posso farlo?

risposta

0

Ecco come fare questo usando RDD e un approccio più Scala al gusto:

// Let's first get the data in key-value pair format 
val data = sc.makeRDD(Seq(("a",20), ("a", 1), ("a",8), ("b",3), ("b",10), ("b",9))) 

// Next let's find the max value from each group 
val maxGroups = data.reduceByKey(Math.max(_,_)) 

// We join the max in the group with the original data 
val combineMaxWithData = maxGroups.join(data) 

// Finally we filter out the values that agree with the max 
val finalResults = combineMaxWithData.filter{ case (gid, (max,curVal)) => max != curVal }.map{ case (gid, (max,curVal)) => (gid,curVal) } 


println(finalResults.collect.toList) 
>List((a,1), (a,8), (b,3), (b,9)) 
+0

Puoi piacere e postare il codice equivalente JAVA? – user3802925

+0

Non ho mai usato Spark su Java ... Ci dovrebbero essere equivalenti a Java per ognuna di queste API. Potrebbe non essere così difficile fare la traduzione. Le idee principali dovrebbero applicarsi. – marios

+0

Grazie Marios! Di seguito è riportata l'implementazione Java che avevo apportato con alcune modifiche su join, in cui mi unisco utilizzando una chiave invece di un join cartesiano completo. – user3802925

2

Potete farlo utilizzando le operazioni dataframe e funzioni della finestra. Supponendo di avere i dati in dataframe df1:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 

val maxOnWindow = max(col("val")).over(Window.partitionBy(col("id"))) 
val df2 = df1 
    .withColumn("max", maxOnWindow) 
    .where(col("val") < col("max")) 
    .select("id", "val") 

In Java, l'equivalente sarebbe qualcosa di simile:

import org.apache.spark.sql.functions.Window; 
import static org.apache.spark.sql.functions.*; 

Column maxOnWindow = max(col("val")).over(Window.partitionBy("id")); 
DataFrame df2 = df1 
    .withColumn("max", maxOnWindow) 
    .where(col("val").lt(col("max"))) 
    .select("id", "val"); 

Ecco un bell'articolo sulle funzioni della finestra: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

+0

Grazie Daniel! Sto provando il tuo approccio ora. Mi chiedo solo che sia meglio dell'approccio "join"? – user3802925

+0

@ user3802925 Dato che i dati sono originariamente in un DataFrame, questo approccio evita di convertirlo in RDD, che è già un guadagno sia in termini di prestazioni che di leggibilità del codice. Tuttavia, ritengo che la differenza di prestazioni effettiva tra entrambi gli approcci dipenda dai dati e richiederebbe alcuni test per trarre conclusioni. –

+0

@ user3802925: Siamo spiacenti, non ho visto la versione di join con DataFrame. In questo caso, sta solo la mia seconda affermazione. –

1

riportano di seguito le Implementazione Java del codice scala di Mario:

DataFrame df = sqlContext.read().json(input); 
DataFrame dfMaxRaw = df.groupBy("id").max("val"); 
DataFrame dfMax = dfMaxRaw.select(
    dfMaxRaw.col("id").as("max_id"), dfMaxRaw.col("max(val)").as("max_val") 
); 
DataFrame combineMaxWithData = df.join(dfMax, df.col("id") 
    .equalTo(dfMax.col("max_id"))); 
DataFrame finalResult = combineMaxWithData.filter(
    combineMaxWithData.col("id").equalTo(combineMaxWithData.col("max_id")) 
     .and(combineMaxWithData.col("val").notEqual(combineMaxWithData.col("max_val"))) 
); 
+0

sembra buono, anche se un po 'prolisso, ma è previsto da Java :) – marios