2016-05-03 39 views
7

Ho giocato con la conversione di RDD in DataFrames e viceversa. Innanzitutto, ho avuto un RDD di tipo (Int, Int) chiamato dataPair. Poi ho creato un oggetto dataframe con intestazioni di colonna utilizzando:Come convertire un RDD [Row] di nuovo in DataFrame

val dataFrame = dataPair.toDF(header(0), header(1)) 

Poi ho convertito da un dataframe di nuovo ad un RDD utilizzando:

val testRDD = dataFrame.rdd 

che restituisce un RDD di tipo org.apache.spark. sql.Row (not (Int, Int)). Poi mi piacerebbe riconvertirlo in un RDD utilizzando .toDF ma ottengo un errore:

error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] 

ho provato la definizione di uno schema del tipo di dati (int, int) per testRDD, ma ho tipo eccezioni disadattamento:

error: type mismatch; 
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] 
required: org.apache.spark.rdd.RDD[Data] 
    val testRDD: RDD[Data] = dataFrame.rdd 
            ^

ho già importato

import sqlContext.implicits._ 

risposta

13

Per creare un dataframe da un RDD di righe, di solito si hanno due opzioni principali:

1) È possibile utilizzare toDF() che può essere importato da import sqlContext.implicits._. Tuttavia, questo approccio funziona solo per i seguenti tipi di RDDs:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(fonte: Scaladoc dell'oggetto SQLContext.implicits)

Il las t signature in realtà significa che può funzionare per un RDD di tuple o un RDD di case classes (perché le tuple e le case classes sono sottoclassi di scala.Product).

Quindi, per utilizzare questo approccio per un RDD[Row], è necessario associarlo a un RDD[T <: scala.Product]. Questo può essere fatto mappando ogni riga a una classe custodia personalizzata o una tupla, come nei seguenti frammenti di codice:

val df = rdd.map({ 
    case Row(val1: String, ..., valN: Long) => (val1, ..., valN) 
}).toDF("col1_name", ..., "colN_name") 

o

case class MyClass(val1: String, ..., valN: Long = 0L) 
val df = rdd.map({ 
    case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN) 
}).toDF("col1_name", ..., "colN_name") 

Lo svantaggio principale di questo approccio (a mio parere) è che devi impostare in modo esplicito lo schema del DataFrame risultante nella funzione mappa, colonna per colonna. Forse questo può essere fatto programmaticamente se non si conosce lo schema in anticipo, ma le cose possono diventare un po 'confuse lì. Così, in alternativa, c'è un'altra opzione:


2) È possibile utilizzare createDataFrame(rowRDD: RDD[Row], schema: StructType), che è disponibile nell'oggetto SQLContext.Esempio:

val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema) 

Si noti che non è necessario impostare esplicitamente alcuna colonna dello schema. Riutilizziamo lo schema del vecchio DF, che è della classe StructType e può essere facilmente esteso. Tuttavia, questo approccio a volte non è possibile e in alcuni casi può essere meno efficiente del primo.

Spero sia più chiaro di prima. Saluti.

+0

L'ho capito, avevo bisogno di mappare allo schema Dati usando il seguente: 'val df = testRDD.map {riga del caso (n1: Int, n2: Int) => Dati (n1, n2)}.() ' – TheElysian

+0

Bello, è davvero un'opzione. La soluzione con createDataFrame è più generica, consentendo la conversione anche se non si conoscono quanti campi ha il dataframe originale. –

+0

Ho provato ad usarlo, ma ho continuato a ricevere errori riguardo il sovraccarico del metodo createDataFrame. Grazie comunque. – TheElysian