2016-05-03 26 views
7

Spark documentation mostra come creare un DataFrame da un RDD, utilizzando le classi caso Scala per dedurre uno schema. Sto cercando di riprodurre questo concetto usando sqlContext.createDataFrame(RDD, CaseClass), ma il mio DataFrame finisce vuoto. Ecco il mio codice Scala:Come convertire un RDD basato sulla classe Case in un DataFrame?

// sc is the SparkContext, while sqlContext is the SQLContext. 

// Define the case class and raw data 
case class Dog(name: String) 
val data = Array(
    Dog("Rex"), 
    Dog("Fido") 
) 

// Create an RDD from the raw data 
val dogRDD = sc.parallelize(data) 

// Print the RDD for debugging (this works, shows 2 dogs) 
dogRDD.collect().foreach(println) 

// Create a DataFrame from the RDD 
val dogDF = sqlContext.createDataFrame(dogRDD, classOf[Dog]) 

// Print the DataFrame for debugging (this fails, shows 0 dogs) 
dogDF.show() 

L'uscita che sto vedendo è:

Dog(Rex) 
Dog(Fido) 
++ 
|| 
++ 
|| 
|| 
++ 

Che cosa mi manca?

Grazie!

risposta

12

Tutto ciò che serve è solo

val dogDF = sqlContext.createDataFrame(dogRDD) 

secondo parametro è parte di API Java e si aspetta che tu classe segue Java Beans convenzione (getter/setter). La tua classe di caso non segue questa convenzione, quindi non viene rilevata alcuna proprietà, che porta a DataFrame vuoto senza colonne.

+1

questo ha funzionato. Ho anche dovuto spostare la definizione della classe case al di fuori della mia funzione principale per evitare l'errore: Nessun TypeTag disponibile per Dog'. Grazie! – sparkour

+0

Vedo, molto interessante, quindi il secondo parametro è sempre richiesto solo quando si chiama dall'API Java, scala individuerà solo automaticamente i campi del Tipo che dovrebbero essere convertiti in colonne? – qwwqwwq

5

È possibile creare un DataFrame direttamente da un Seq di istanze di classe di casi utilizzando toDF come segue:

val dogDf = Seq(Dog("Rex"), Dog("Fido")).toDF 
0

Caso approccio di classe non funzionano in modalità cluster. Darà ClassNotFoundException alla classe di case che hai definito.

convertirlo un RDD[Row] e definire lo schema del vostro RDD con StructField e poi createDataFrame come

val rdd = data.map { attrs => Row(attrs(0),attrs(1)) } 

val rddStruct = new StructType(Array(StructField("id", StringType, nullable = true),StructField("pos", StringType, nullable = true))) 

sqlContext.createDataFrame(rdd,rddStruct) 

toDF() abituato lavoro sia