2015-09-23 10 views
12

Ho un RDD con una tupla di valori (String, sparsevector) e voglio creare un dataframe utilizzando il RDD. Per ottenere un (etichetta: stringa, caratteristiche: vettore) DataFrame che è lo schema richiesto dalla maggior parte delle librerie dell'algoritmo ml. So che può essere fatto perché la libreria HashingTF ml restituisce un vettore quando viene fornita una colonna delle caratteristiche di un dataframe .Come faccio a convertire un RDD con una colonna sparsevector ad un dataframe con una colonna come vettore

temp_df = sqlContext.createDataFrame(temp_rdd, StructType([ 
     StructField("label", DoubleType(), False), 
     StructField("tokens", ArrayType(StringType()), False) 
    ])) 

#assumming there is an RDD (double,array(strings)) 

hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features") 

ndf = hashingTF.transform(temp_df) 
ndf.printSchema() 

#outputs 
#root 
#|-- label: double (nullable = false) 
#|-- tokens: array (nullable = false) 
#| |-- element: string (containsNull = true) 
#|-- features: vector (nullable = true) 

Quindi la mia domanda è, posso in qualche modo avere un RDD di (String, sparsevector) convertirlo in un dataframe di (String, vettoriale). Ho provato con il solito sqlContext.createDataFrame ma non c'è lo DataType che si adatta alle esigenze che ho.

df = sqlContext.createDataFrame(rdd,StructType([ 
     StructField("label" , StringType(),True), 
     StructField("features" , ?Type(),True) 
    ])) 

risposta

17

Devi usare VectorUDT qui:

# In Spark 1.x 
# from pyspark.mllib.linalg import SparseVector, VectorUDT 
from pyspark.ml.linalg import SparseVector, VectorUDT 

temp_rdd = sc.parallelize([ 
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})), 
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))]) 

schema = StructType([ 
    StructField("label", DoubleType(), True), 
    StructField("features", VectorUDT(), True) 
]) 

temp_rdd.toDF(schema).printSchema() 

## root 
## |-- label: double (nullable = true) 
## |-- features: vector (nullable = true) 

Solo per completezza Scala equivalente:

import org.apache.spark.sql.Row 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.types.{DoubleType, StructType} 
// In Spark 1x. 
// import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} 
import org.apache.spark.ml.linalg.Vectors 
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType 

val schema = new StructType() 
    .add("label", DoubleType) 
    // In Spark 1.x 
    //.add("features", new VectorUDT()) 
    .add("features",VectorType) 

val temp_rdd: RDD[Row] = sc.parallelize(Seq(
    Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))), 
    Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5)))) 
)) 

spark.createDataFrame(temp_rdd, schema).printSchema 

// root 
// |-- label: double (nullable = true) 
// |-- features: vector (nullable = true) 
+2

Wow, cercavo questo da secoli! quasi un grido di felicità:,) +1 –

+1

Questo ha funzionato! Grazie mille! puoi dirmi dove si trova la documentazione? Non riesco a trovare alcun VectorUDT su linalg apache spark Docs –

+0

@OrangelMarquez forse è richiesta una richiesta di pull –

4

Mentre @ zero323 rispondere https://stackoverflow.com/a/32745924/1333621 ha un senso, e vorrei che ha funzionato per me - il rdd sottostante il dataframe, sqlContext.createDataFrame (temp_rdd, schema), i tipi SparseVectors ancora contenuti ho dovuto fare quanto segue per convertire i tipi DenseVector - se qualcuno ha un modo più breve/più voglio sapere

temp_rdd = sc.parallelize([ 
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})), 
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))]) 

schema = StructType([ 
    StructField("label", DoubleType(), True), 
    StructField("features", VectorUDT(), True) 
]) 

temp_rdd.toDF(schema).printSchema() 
df_w_ftr = temp_rdd.toDF(schema) 

print 'original convertion method: ',df_w_ftr.take(5) 
print('\n') 
temp_rdd_dense = temp_rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray()))) 
print type(temp_rdd_dense), type(temp_rdd) 
print 'using map and toArray:', temp_rdd_dense.take(5) 

temp_rdd_dense.toDF().show() 

root 
|-- label: double (nullable = true) 
|-- features: vector (nullable = true) 

original convertion method: [Row(label=0.0, features=SparseVector(4, {1: 1.0, 3: 5.5})), Row(label=1.0, features=SparseVector(4, {0: -1.0, 2: 0.5}))] 


<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.RDD'> 
using map and toArray: [Row(features=DenseVector([0.0, 1.0, 0.0, 5.5]), label=0.0), Row(features=DenseVector([-1.0, 0.0, 0.5, 0.0]), label=1.0)] 

+------------------+-----+ 
|   features|label| 
+------------------+-----+ 
| [0.0,1.0,0.0,5.5]| 0.0| 
|[-1.0,0.0,0.5,0.0]| 1.0| 
+------------------+-----+ 
1

questo è un esempio in scala scintilla 2,1

import org.apache.spark.ml.linalg.Vector 

def featuresRDD2DataFrame(features: RDD[Vector]): DataFrame = { 
    import sparkSession.implicits._ 
    val rdd: RDD[(Double, Vector)] = features.map(x => (0.0, x)) 
    val df = rdd.toDF("label","features").select("features") 
    df 
    } 

il toDF() non è stato riconosciuto dal compilatore sulle caratteristiche rdd