2015-06-14 3 views
5

Dopo l'analisi alcuni jsons ho un dataframe una colonna di arrayCome posso flatMap una fila di matrici in più righe?

scala> val jj =sqlContext.jsonFile("/home/aahu/jj2.json") 
res68: org.apache.spark.sql.DataFrame = [r: array<bigint>] 
scala> jj.first() 
res69: org.apache.spark.sql.Row = [List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)] 

Vorrei esplodere ogni riga fuori in più righe. Come?

edit:

originale del file JSON:

{"r": [0,1,2,3,4,5,6,7,8,9]} 
{"r": [0,1,2,3,4,5,6,7,8,9]} 

Voglio una RDD o un dataframe con 20 righe.

non posso semplicemente utilizzare flatMap qui - io non sono sicuro di quello che il comando appropriato nella scintilla è:

scala> jj.flatMap(r => r) 
<console>:22: error: type mismatch; 
found : org.apache.spark.sql.Row 
required: TraversableOnce[?] 
       jj.flatMap(r => r) 
+0

per favore pubblica l'esempio json originale e l'esempio del risultato che ti aspetti – vvladymyrov

+0

@vvladymyrov è nella modifica – dranxo

risposta

4

È possibile utilizzare DataFrame.explode per ottenere ciò che desideri. Di seguito è riportato ciò che ho provato in spark-shell con i dati del campione JSON.

import scala.collection.mutable.ArrayBuffer 
val jj1 = jj.explode("r", "r1") {list : ArrayBuffer[Long] => list.toList } 
val jj2 = jj1.select($"r1") 
jj2.collect 

È possibile fare riferimento alla documentazione API per capire di più DataFrame.explode

+1

Quale versione di spark/scala sei? Sto usando 1.4 in modalità locale e ottieni java.lang.ClassCastException: scala.collection.immutable. $ Colon $ colon non può essere lanciato su scala.collection.mutable.ArrayBuffer quando provo questo – dranxo

+0

Sto usando 1.3.1 –

+0

Grazie, appena provato su 1.3.1 e funziona – dranxo

3

Ho provato questo con Spark 1.3.1 oppure è possibile utilizzare la funzione Row.getAs:

import scala.collection.mutable.ArrayBuffer 
val elementsRdd = jj.select(jj("r")).map(t=>t.getAs[ArrayBuffer[Long]](0)).flatMap(x=>x) 
elementsRdd.count() 
>>>Long = 20 
elementsRdd.take(5) 
>>>Array[Long] = Array(0, 1, 2, 3, 4) 
+0

Funziona anche. – dranxo

2

In Spark 1.3+ è possibile utilizzare la funzione explode direttamente sulla colonna di interesse:

import org.apache.spark.sql.functions.explode 

jj.select(explode($"r"))