2016-03-13 44 views
7

Ho provato ad applicare PCA ai miei dati e quindi applicare RandomForest ai dati trasformati. Tuttavia, PCA.transform (data) mi ha fornito un DataFrame ma ho bisogno di un mllib LabeledPoints per alimentare il mio RandomForest. Come lo posso fare? Il mio codice:Come convertire spark DataFrame in RDD mllib LabeledPoints?

import org.apache.spark.mllib.util.MLUtils 
    import org.apache.spark.{SparkConf, SparkContext} 
    import org.apache.spark.mllib.tree.RandomForest 
    import org.apache.spark.mllib.tree.model.RandomForestModel 
    import org.apache.spark.ml.feature.PCA 
    import org.apache.spark.mllib.regression.LabeledPoint 
    import org.apache.spark.mllib.linalg.Vectors 


    val dataset = MLUtils.loadLibSVMFile(sc, "data/mnist/mnist.bz2") 

    val splits = dataset.randomSplit(Array(0.7, 0.3)) 

    val (trainingData, testData) = (splits(0), splits(1)) 

    val trainingDf = trainingData.toDF() 

    val pca = new PCA() 
    .setInputCol("features") 
    .setOutputCol("pcaFeatures") 
    .setK(100) 
    .fit(trainingDf) 

    val pcaTrainingData = pca.transform(trainingDf) 

    val numClasses = 10 
    val categoricalFeaturesInfo = Map[Int, Int]() 
    val numTrees = 10 // Use more in practice. 
    val featureSubsetStrategy = "auto" // Let the algorithm choose. 
    val impurity = "gini" 
    val maxDepth = 20 
    val maxBins = 32 

    val model = RandomForest.trainClassifier(pcaTrainingData, numClasses, categoricalFeaturesInfo, 
     numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) 


    error: type mismatch; 
    found : org.apache.spark.sql.DataFrame 
    required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] 

ho provato i seguenti due possibili soluzioni, ma non ha funzionato:

scala> val pcaTrainingData = trainingData.map(p => p.copy(features = pca.transform(p.features))) 
<console>:39: error: overloaded method value transform with alternatives: 
    (dataset: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame <and> 
    (dataset: org.apache.spark.sql.DataFrame,paramMap: org.apache.spark.ml.param.ParamMap)org.apache.spark.sql.DataFrame <and> 
    (dataset: org.apache.spark.sql.DataFrame,firstParamPair: org.apache.spark.ml.param.ParamPair[_],otherParamPairs: org.apache.spark.ml.param.ParamPair[_]*)org.apache.spark.sql.DataFrame 
    cannot be applied to (org.apache.spark.mllib.linalg.Vector) 

E:

org.apache.spark
 val labeled = pca 
    .transform(trainingDf) 
    .map(row => LabeledPoint(row.getDouble(0), row(4).asInstanceOf[Vector[Int]])) 

    error: type mismatch; 
    found : scala.collection.immutable.Vector[Int] 
    required: org.apache.spark.mllib.linalg.Vector 

(Ho importato .mllib.linalg.Vectors nel caso precedente)

Qualsiasi aiuto?

+1

il codice funziona bene per me (così com'è, senza che i due tentativi di soluzione). Suppongo che tu abbia sbagliato una delle importazioni? Sto usando 'import org.apache.spark.ml.feature.PCA',' importa org.apache.spark.mllib.util.MLUtils'. L'ho eseguito con questo file: https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2 –

+0

@TzachZohar Oh ho le stesse importazioni del tuo e ho modificato la mia domanda aggiungendoli. Ho anche usato lo stesso file di dati. È stato perché corro in shell piuttosto che spark-submit quindi non ha funzionato? –

+2

Perché tutti i downvotes? Sembra una domanda ragionevole. – javadba

risposta

10

L'approccio corretto qui è il secondo che hai provato - mappare ogni in un LabeledPoint per ottenere un RDD[LabeledPoint]. Tuttavia, ha due errori:

  1. la correttezza Vector classe (org.apache.spark.mllib.linalg.Vector) non prende argomenti di tipo (per esempio Vector[Int]) - quindi anche se si aveva l'importazione destra, il compilatore ha concluso che si intende scala.collection.immutable.Vector che fa.
  2. Il dataframe tornato da PCA.fit() ha 3 colonne, e si è tentato di estrarre il numero di colonna 4. Ad esempio, mostrando prime 4 linee:

    +-----+--------------------+--------------------+ 
    |label|   features|   pcaFeatures| 
    +-----+--------------------+--------------------+ 
    | 5.0|(780,[152,153,154...|[880.071111851977...| 
    | 1.0|(780,[158,159,160...|[-41.473039034112...| 
    | 2.0|(780,[155,156,157...|[931.444898405036...| 
    | 1.0|(780,[124,125,126...|[25.5114585648411...| 
    +-----+--------------------+--------------------+ 
    

    per rendere questo più facile - preferisco utilizzando la colonna nomi invece dei loro indici.

Quindi, ecco la trasformazione è necessario:

val labeled = pca.transform(trainingDf).rdd.map(row => LabeledPoint(
    row.getAs[Double]("label"), 
    row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures") 
))