10

Ho una tabella Cassandra che per semplicità simile a:Come interrogare la colonna di dati JSON usando Spark DataFrames?

key: text 
jsonData: text 
blobData: blob 

posso creare una cornice di dati di base per questo utilizzando scintilla e la scintilla-cassandra-connettore utilizzando:

val df = sqlContext.read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "mytable", "keyspace" -> "ks1")) 
    .load() 

I' Sto lottando però per espandere i dati JSON nella sua struttura sottostante. Alla fine voglio essere in grado di filtrare in base agli attributi all'interno della stringa json e restituire i dati blob. Qualcosa come jsonData.foo = "bar" e restituisce blobData. Questo è attualmente possibile?

+0

"chiave" è un identificativo univoco? – zero323

+0

sì, la chiave è la chiave primaria della tabella – JDesuv

risposta

27

Spark 2.1+

È possibile utilizzare la funzione from_json:

import org.apache.spark.sql.functions.from_json 
import org.apache.spark.sql.types._ 

val schema = StructType(Seq(
    StructField("k", StringType, true), StructField("v", DoubleType, true) 
)) 

df.withColumn("jsonData", from_json($"jsonData", schema)) 

Spark 1.6 +

È possibile utilizzare get_json_object che prende una colonna e un percorso:

import org.apache.spark.sql.functions.get_json_object 

val exprs = Seq("k", "v").map(
    c => get_json_object($"jsonData", s"$$.$c").alias(c)) 

df.select($"*" +: exprs: _*) 

ed estrae i campi su singole stringhe che possono essere ulteriormente castate ai tipi previsti.

Spark < = 1.5:

È questo attualmente possibile?

Per quanto ne so non è direttamente possibile. Si può provare qualcosa di simile a questo:

val df = sc.parallelize(Seq(
    ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), 
    ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") 
)).toDF("key", "jsonData", "blobData") 

Presumo che blob campo non può essere rappresentato in JSON. In caso contrario, si cabina omettere la scissione e la giunzione:

import org.apache.spark.sql.Row 

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") 
val jsons = sqlContext.read.json(df.drop("blobData").map{ 
    case Row(key: String, json: String) => 
    s"""{"key": "$key", "jsonData": $json}""" 
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") 
parsed.printSchema 

// root 
// |-- jsonData: struct (nullable = true) 
// | |-- k: string (nullable = true) 
// | |-- v: double (nullable = true) 
// |-- key: long (nullable = true) 
// |-- blobData: string (nullable = true) 

Un'alternativa (più economico, anche se più complesso) approccio è quello di utilizzare un UDF per analizzare JSON e l'uscita di una colonna o di structmap. Per esempio qualcosa di simile:

import net.liftweb.json.parse 

case class KV(k: String, v: Int) 

val parseJson = udf((s: String) => { 
    implicit val formats = net.liftweb.json.DefaultFormats 
    parse(s).extract[KV] 
}) 

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) 
parsed.show 

// +---+--------------------+------------------+----------+ 
// |key|   jsonData|   blobData|parsedJSON| 
// +---+--------------------+------------------+----------+ 
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| 
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| 
// +---+--------------------+------------------+----------+ 

parsed.printSchema 

// root 
// |-- key: string (nullable = true) 
// |-- jsonData: string (nullable = true) 
// |-- blobData: string (nullable = true) 
// |-- parsedJSON: struct (nullable = true) 
// | |-- k: string (nullable = true) 
// | |-- v: integer (nullable = false) 
0

sottostante JSON stringa è

"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}"; 

Di seguito si riporta lo script per filtrare il JSON e caricare i dati richiesti a Cassandra.

sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2") 
      .write.format("org.apache.spark.sql.cassandra") 
      .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name")) 
      .mode(SaveMode.Append) 
      .save() 
1

La funzione from_json è esattamente quello che stai cercando. Il tuo codice sarà simile al seguente:

val df = sqlContext.read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "mytable", "keyspace" -> "ks1")) 
    .load() 

//You can define whatever struct type that your json states 
val schema = StructType(Seq(
    StructField("key", StringType, true), 
    StructField("value", DoubleType, true) 
)) 

df.withColumn("jsonData", from_json(col("jsonData"), schema))