2015-09-11 4 views
6

Perché il seguente messaggio restituisce un errore?Perché il compilatore Spark/Scala non riesce a trovare toDF su RDD [Mappa [Int, Int]]?

scala> import sqlContext.implicits._ 
import sqlContext.implicits._ 

scala> val rdd = sc.parallelize(1 to 10).map(x => (Map(x -> 0), 0)) 
rdd: org.apache.spark.rdd.RDD[(scala.collection.immutable.Map[Int,Int], Int)] = MapPartitionsRDD[20] at map at <console>:27 

scala> rdd.toDF 
res8: org.apache.spark.sql.DataFrame = [_1: map<int,int>, _2: int] 

scala> val rdd = sc.parallelize(1 to 10).map(x => Map(x -> 0)) 
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]] = MapPartitionsRDD[23] at map at <console>:27 

scala> rdd.toDF 
<console>:30: error: value toDF is not a member of org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]] 
       rdd.toDF 

Così che cosa esattamente sta succedendo qui, todf può convertire RDD di tipo (scala.collection.immutable.Map[Int,Int], Int) per dataframe, ma non di tipo scala.collection.immutable.Map[Int,Int]. Perché?

risposta

9

Per lo stesso motivo per cui non è possibile utilizzare

sqlContext.createDataFrame(1 to 10).map(x => Map(x -> 0)) 

Se si dà un'occhiata alla fonte org.apache.spark.sql.SQLContext troverete due diverse implementazioni del metodo createDataFrame:

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame 

e

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame 

Come potete vedere entrambi richiedono A essere una sottoclasse di Product. Quando si chiama toDF su un RDD[(Map[Int,Int], Int)] funziona perché Tuple2 è effettivamente un Product. Map[Int,Int] da solo non è quindi l'errore.

È possibile farlo funzionare avvolgendo Map con Tuple1:

sc.parallelize(1 to 10).map(x => Tuple1(Map(x -> 0))).toDF 
5

Fondamentalmente perché non v'è alcuna implicita di creare un dataframe per una mappa all'interno di un RDD.

Nel primo esempio si restituisce una tupla, che è un prodotto per il quale esiste una conversione implicita.

rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A])

Nel secondo esempio si utilizza una mappa nella tua RDD, per i quali non v'è alcuna conversione implicita.