2016-03-30 40 views
7

Ho un set di dati di dimensioni ridotte che sarà il risultato di un lavoro Spark. Sto pensando di convertire questo set di dati in un dataframe per comodità alla fine del lavoro, ma ho faticato a definire correttamente lo schema. Il problema è l'ultimo campo sottostante (topValues); è un ArrayBuffer di tuple - chiavi e conteggi.Spark: Creazione programmatica dello schema del dataframe in scala

val innerSchema = 
    StructType(
     Array(
     StructField("value", StringType), 
     StructField("count", LongType) 
    ) 
    ) 
    val outputSchema = 
    StructType(
     Array(
     StructField("name", StringType, nullable=false), 
     StructField("index", IntegerType, nullable=false), 
     StructField("count", LongType, nullable=false), 
     StructField("empties", LongType, nullable=false), 
     StructField("nulls", LongType, nullable=false), 
     StructField("uniqueValues", LongType, nullable=false), 
     StructField("mean", DoubleType), 
     StructField("min", DoubleType), 
     StructField("max", DoubleType), 
     StructField("topValues", innerSchema) 
    ) 
    ) 

    val result = stats.columnStats.map{ c => 
    Row(c._2.name, c._1, c._2.count, c._2.empties, c._2.nulls, c._2.uniqueValues, c._2.mean, c._2.min, c._2.max, c._2.topValues.topN) 
    } 

    val rdd = sc.parallelize(result.toSeq) 

    val outputDf = sqlContext.createDataFrame(rdd, outputSchema) 

    outputDf.show() 

L'errore che sto ottenendo è un MatchError: scala.MatchError: ArrayBuffer((10,2), (20,3), (8,1)) (of class scala.collection.mutable.ArrayBuffer)

Quando il debug e controllare i miei oggetti, sto vedendo questo:

rdd: ParallelCollectionRDD[2] 
rdd.data: "ArrayBuffer" size = 2 
rdd.data(0): [age,2,6,0,0,3,14.666666666666666,8.0,20.0,ArrayBuffer((10,2), (20,3), (8,1))] 
rdd.data(1): [gender,3,6,0,0,2,0.0,0.0,0.0,ArrayBuffer((M,4), (F,2))] 

Mi sembra che ho' Ho accuratamente descritto l'ArrayBuffer delle tuple nel mio innerSchema, ma Spark non è d'accordo.

Qualche idea su come dovrei definire lo schema?

+0

Sarebbe utile se fornisci dati di esempio o almeno un tipo esatto di 'rdd'. – zero323

risposta

10
val rdd = sc.parallelize(Array(Row(ArrayBuffer(1,2,3,4)))) 
val df = sqlContext.createDataFrame(
    rdd, 
    StructType(Seq(StructField("arr", ArrayType(IntegerType, false), false) 
) 

df.printSchema 
root 
|-- arr: array (nullable = false) 
| |-- element: integer (containsNull = false) 

df.show 
+------------+ 
|   arr| 
+------------+ 
|[1, 2, 3, 4]| 
+------------+ 
+0

Sì, ArrayType è l'approccio giusto. Grazie! Il mio schema finale è nella mia risposta. – Stuart

4

Come ha sottolineato David, avevo bisogno di usare un tipo di array. Spark ne è felice:

val outputSchema = 
    StructType(
     Array(
     StructField("name", StringType, nullable=false), 
     StructField("index", IntegerType, nullable=false), 
     StructField("count", LongType, nullable=false), 
     StructField("empties", LongType, nullable=false), 
     StructField("nulls", LongType, nullable=false), 
     StructField("uniqueValues", LongType, nullable=false), 
     StructField("mean", DoubleType), 
     StructField("min", DoubleType), 
     StructField("max", DoubleType), 
     StructField("topValues", ArrayType(StructType(Array(
      StructField("value", StringType), 
      StructField("count", LongType) 
     )))) 
    ) 
    )