2016-07-17 305 views
5

Sto cercando di utilizzare SQL su un frame di dati spark. Ma il frame di dati ha 1 valore ha stringa (che è JSON come la struttura):Come eseguire una query sul frame di dati in cui 1 campo di StringType ha valore json in Spark SQL

ho salvato il mio telaio dati alla tabella temporanea: TestTable

Quando ho fatto disc:

col_name      data_type 
requestId      string 
name       string 
features      string 

Ma dispone di valori è un jSON:

{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}} 

voglio solo interrogare sulla TabellaProva dove totalSpent> 10. Può un certo dirmi come faccio a fare questo?

Il mio file JSON assomiglia:

{ 
     "requestId": 232323, 
     "name": "ravi", 
     "features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}" 
    } 

caratteristiche è una stringa. Ho solo bisogno di TotalSpent in questo. Ho provato con:

val features = StructType( 
Array(StructField("totalSpent",LongType,true), 
StructField("movies",LongType,true) 
)) 

val schema = StructType(Array( 
StructField("requestId",StringType,true), 
StructField("name",StringType,true), 
StructField("features",features,true), 
) 
) 

val records = sqlContext.read.schema(schema).json(filePath) 

Poiché ogni richiesta ha una stringa di caratteristiche JSON. Ma questo mi dà errore.

root 
|-- requestId: string (nullable = true) 
|-- features: string (nullable = true) 
|-- name: string (nullable = true) 

Posso usare parallelizzare all'interno StructField durante la creazione dello schema:

Quando ho provato con

val records = sqlContext.jsonFile(filePath) 

records.printSchema 

mi mostra? Ho provato con:

I first tried with : 

val customer = StructField("features",StringType,true) 
val events = sc.parallelize(customer :: Nil) 


val schema = StructType(Array( 
    StructField("requestId",StringType,true), 
    StructField("name", StructType(events, true),true), 
    StructField("features",features,true), 
    ) 
    ) 

Questo mi dà errore pure. provato anche:

import net.liftweb.json.parse 

case class KV(k: String, v: Int) 

val parseJson = udf((s: String) => { 
    implicit val formats = net.liftweb.json.DefaultFormats 
    parse(s).extract[KV] 
}) 

val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show 

This gives me : 
<console>:78: error: object liftweb is not a member of package net 
     import net.liftweb.json.parse 

provato:

ho provato con:

val parseJson = udf((s: String) => { 
    sqlContext.read.json(s) 
}) 

val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show 

Ma ancora una volta l'errore.

provato:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 

val parseJson = udf((s: String) => { 
parse(s) 
}) 

val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show 

ma mi dà:

java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported 
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) 
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64) 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) 

Questo mi dà il corretto schema (in base alla risposta data da zero323:

val extractFeatures = udf((features: String) => Try { 
implicit val formats = DefaultFormats 
    parse(features).extract[Features] 
}.toOption) 

val parsed = records.withColumn("features", extractFeatures($"features")) 

parsed.printSchema

Ma quando interrogo:

val value = parsed.filter($"requestId" === "232323").select($"features.totalSpent") 

value.show gives null.

+0

@ zero323: Ciao, ho attraversato l'altra domanda u rispose, ma non lo faccio abbastanza capisco. Puoi spiegarmi per favore? Sono molto nuovo a scintillare. – Swetha

+0

Quale parte non è chiara? È una risposta lunga che mostra metodi diversi a seconda dei requisiti e della versione. Prendi in considerazione una soluzione particolare? – zero323

+0

@ zero323 Ho provato ad usare il parallelismo sul mio schema perché utilizzo lo schema per ottenere altri valori, quindi preferisco usarlo nello schema ma dà errore. Sono propenso a UDF per analizzare JSON, in cui hai citato case class KV (k: String, v: Int) È a causa del modo in cui il suo json è formattato? – Swetha

risposta

2

Quando si restituiscono dati da UDF, è necessario rappresentarli come tipi SQL e JSON AST no.Un approccio è quello di creare una classe caso simile a questo:

case class Features(
    places: Integer, 
    movies: Integer, 
    totalPlacesVisited: Integer, 
    totalSpent: Integer, 
    SpentMap: Map[String, Integer], 
    benefits: Map[String, Integer] 
) 

e utilizzarlo per extract oggetti:

val df = Seq((
    232323, "ravi", 
    """{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}""" 
)).toDF("requestId", "name", "features") 

val extractFeatures = udf((features: String) => 
    parse(features).extract[Features]) 

val parsed = df.withColumn("features", extractFeatures($"features")) 
parsed.show(false) 

// +---------+----+-----------------------------------------------------------------+ 
// |requestId|name|features               | 
// +---------+----+-----------------------------------------------------------------+ 
// |232323 |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]| 
// +---------+----+-----------------------------------------------------------------+ 

parsed.printSchema 

// root 
// |-- requestId: integer (nullable = false) 
// |-- name: string (nullable = true) 
// |-- features: struct (nullable = true) 
// | |-- places: integer (nullable = true) 
// | |-- movies: integer (nullable = true) 
// | |-- totalPlacesVisited: integer (nullable = true) 
// | |-- totalSpent: integer (nullable = true) 
// | |-- SpentMap: map (nullable = true) 
// | | |-- key: string 
// | | |-- value: integer (valueContainsNull = true) 
// | |-- benefits: map (nullable = true) 
// | | |-- key: string 
// | | |-- value: integer (valueContainsNull = true) 

seconda gli altri record e il loro utilizzo previsto è necessario regolare la rappresentazione e aggiungere errori rilevanti gestione della logica.

È inoltre possibile utilizzare DSL per accedere ai singoli campi come stringhe:

val getMovieSpent = udf((s: String) => 
    compact(render(parse(s) \\ "SpentMap" \\ "Movie"))) 

df.withColumn("movie_spent", getMovieSpent($"features").cast("bigint")).show 
// +---------+----+--------------------+-----------+ 
// |requestId|name|   features|movie_spent| 
// +---------+----+--------------------+-----------+ 
// | 232323|ravi|{"places":11,"mov...|   2| 
// +---------+----+--------------------+-----------+ 

Per approcci alternativi vedere How to query JSON data column using Spark DataFrames?

+0

Sì. Il mio male :) Grazie per la spiegazione dettagliata. – Swetha

+0

Funziona bene. Ma provo parsed.show Mi dà: java.lang.NullPointerExceptio – Swetha

+0

Come ho detto prima, avrai bisogno di un'appropriata gestione delle eccezioni e probabilmente alcuni aggiustamenti a seconda della regolarità dei tuoi dati. Per cominciare, puoi usare 'scala.util.Try (...). ToOption', ma è un approccio molto approssimativo. – zero323