2015-11-25 18 views
8

Ho un org.apache.spark.sql.DataFrame con più colonne. Voglio scala 1 colonna (lat_long_dist) utilizzando MinMax normalizzazione o qualsiasi tecnica per scalare i dati tra -1 e 1 e conservare il tipo di dati come org.apache.spark.sql.DataFrameNormalizzazione MinMax in scala

scala> val df = sqlContext.csvFile("tenop.csv") 
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string, 
    ip_crowding: string, lat_long_dist: double, stream_name_1: string] 

Ho trovato lo StandardScaler opzione ma che richiede di trasformare il set di dati prima che io possa fare la trasformazione. C'è un modo semplice e pulito.

risposta

9

Credo che quello che vuoi è qualcosa di simile

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{min, max, lit} 

val df = sc.parallelize(Seq(
    (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3) 
)).toDF("k", "v") 

val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match { 
    case Row(x: Double, y: Double) => (x, y) 
} 

val scaledRange = lit(2) // Range of the scaled variable 
val scaledMin = lit(-1) // Min value of the scaled variable 
val vNormalized = ($"v" - vMin)/(vMax - vMin) // v normalized to (0, 1) range 

val vScaled = scaledRange * vNormalized + scaledMin 

df.withColumn("vScaled", vScaled).show 

// +---+-----+--------------------+ 
// | k| v|    vScaled| 
// +---+-----+--------------------+ 
// | 1| 0.5| -0.3093093093093092| 
// | 2| 10.2| 0.27327327327327344| 
// | 3| 5.7|0.003003003003003...| 
// | 4|-11.0|    -1.0| 
// | 5| 22.3|     1.0| 
// +---+-----+--------------------+ 
11

Ecco un altro suggerimento quando si sta già giocando con Spark.

Perché non usi MinMaxScaler in pacchetto ml?

Proviamo questo con lo stesso esempio da zero323.

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.ml.feature.MinMaxScaler 
import org.apache.spark.sql.functions.udf 

val df = sc.parallelize(Seq(
    (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3) 
)).toDF("k", "v") 

//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v")))) 

val vectorizeCol = udf((v:Double) => Vectors.dense(Array(v))) 
val df2 = df.withColumn("vVec", vectorizeCol(df("v")) 

val scaler = new MinMaxScaler() 
    .setInputCol("vVec") 
    .setOutputCol("vScaled") 
    .setMax(1) 
    .setMin(-1) 

scaler.fit(df2).transform(df2).show 
+---+-----+-------+--------------------+ 
| k| v|  vv|     vs| 
+---+-----+-------+--------------------+ 
| 1| 0.5| [0.5]|[-0.3093093093093...| 
| 2| 10.2| [10.2]|[0.27327327327327...| 
| 3| 5.7| [5.7]|[0.00300300300300...| 
| 4|-11.0|[-11.0]|    [-1.0]| 
| 5| 22.3| [22.3]|    [1.0]| 
+---+-----+-------+--------------------+ 

Sfruttare il ridimensionamento di più colonne contemporaneamente.

val df = sc.parallelize(Seq(
    (1.0, -1.0, 2.0), 
    (2.0, 0.0, 0.0), 
    (0.0, 1.0, -1.0) 
)).toDF("a", "b", "c") 

import org.apache.spark.ml.feature.VectorAssembler 

val assembler = new VectorAssembler() 
    .setInputCols(Array("a", "b", "c")) 
    .setOutputCol("features") 

val df2 = assembler.transform(df) 

// Reusing the scaler instance above with the same min(-1) and max(1) 
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show 
+---+----+----+--------------+--------------------+ 
| a| b| c|  features|  scaledFeatures| 
+---+----+----+--------------+--------------------+ 
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]|  [0.0,-1.0,1.0]| 
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...| 
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]|  [-1.0,1.0,-1.0]| 
+---+----+----+--------------+--------------------+ 
+0

Ottima risposta, mi risparmia molto tempo :) –

+0

Felice che ti abbia aiutato @MostafaAlaa – Lyle