2016-04-06 27 views
13

Sto tentando di utilizzare l'API Spark Dataset ma sto riscontrando alcuni problemi nel fare un semplice join.Spark Dataset API - join

Diciamo che ho due set di dati con i campi: date | value, quindi nel caso di DataFrame il mio unirsi sarebbe simile:

val dfA : DataFrame 
val dfB : DataFrame 

dfA.join(dfB, dfB("date") === dfA("date")) 

Tuttavia, per Dataset c'è il metodo .joinWith, ma lo stesso approccio non funziona :

val dfA : Dataset 
val dfB : Dataset 

dfA.joinWith(dfB, ?) 

Qual è l'argomento richiesto da .joinWith?

risposta

19

Per utilizzare joinWith devi prima creare un DataSet e molto probabilmente due di loro. Per creare un DataSet, è necessario creare una classe di caso che corrisponda allo schema e chiamare dove T è la classe del caso. Quindi:

case class KeyValue(key: Int, value: String) 
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") 
val ds = df.as[KeyValue] 
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string] 

Si potrebbe anche saltare la classe caso e utilizzare una tupla:

val tupDs = df.as[(Int,String)] 
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string] 

Quindi se si ha un altro caso di classe/DF, in questo modo dire:

case class Nums(key: Int, num1: Double, num2: Long) 
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") 
val ds2 = df2.as[Nums] 
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint] 

Poi , mentre la sintassi di join e joinWith sono simili, i risultati sono diversi:

df.join(df2, df.col("key") === df2.col("key")).show 
// +---+-----+---+----+----+ 
// |key|value|key|num1|num2| 
// +---+-----+---+----+----+ 
// | 1| asdf| 1| 7.7| 101| 
// | 2|34234| 2| 1.2| 10| 
// +---+-----+---+----+----+ 

ds.joinWith(ds2, df.col("key") === df2.col("key")).show 
// +---------+-----------+ 
// |  _1|   _2| 
// +---------+-----------+ 
// | [1,asdf]|[1,7.7,101]| 
// |[2,34234]| [2,1.2,10]| 
// +---------+-----------+ 

Come potete vedere, joinWith lascia intatti gli oggetti come parti di una tupla, mentre join appiattisce le colonne in un singolo spazio dei nomi. (. Che sarà causare problemi nel caso di cui sopra, perché la "chiave" nome di colonna viene ripetuto)

Curiosamente, devo usare df.col("key") e df2.col("key") di creare le condizioni per l'adesione ds e ds2 - se si utilizza solo col("key") su entrambi i lati non funziona, e ds.col(...) non esiste. L'utilizzo dell'originale df.col("key") fa il trucco, tuttavia.

+3

spiegazione dettagliata. Solo una confusione. C'è un modo migliore per scrivere la condizione di join digitato. per es. df.col ("chiave") possiamo avere qualcosa di più sicuro che può risolvere la correttezza della "chiave" in fase di compilazione. –

+5

Sono completamente d'accordo, in base a questa sintassi non è utile creare il set di dati, quindi qual è il vantaggio? Non riesco a superare il fatto che non ci sia un'alternativa dattiloscritta .. Peccato! – Sparky

2

Nell'esempio di cui sopra si può provare sotto l'opzione -

  • definire una classe caso per l'output

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • Unire due dataset con "Seq (" chiave ")", questo consente di evitare due colonne chiave duplicate nell'output.Che contribuirà ad applicare la classe di caso o di recuperare i dati in fase successiva

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+

+0

non rispondi in modo specifico alla domanda, ma il suggerimento Seq ("chiave") mi ha aiutato – ImDarrenG