Sto cercando di utilizzare SQL su un frame di dati spark. Ma il frame di dati ha 1 valore ha stringa (che è JSON come la struttura):Come eseguire una query sul frame di dati in cui 1 campo di StringType ha valore json in Spark SQL
ho salvato il mio telaio dati alla tabella temporanea: TestTable
Quando ho fatto disc:
col_name data_type
requestId string
name string
features string
Ma dispone di valori è un jSON:
{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}
voglio solo interrogare sulla TabellaProva dove totalSpent> 10. Può un certo dirmi come faccio a fare questo?
Il mio file JSON assomiglia:
{
"requestId": 232323,
"name": "ravi",
"features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"
}
caratteristiche è una stringa. Ho solo bisogno di TotalSpent in questo. Ho provato con:
val features = StructType(
Array(StructField("totalSpent",LongType,true),
StructField("movies",LongType,true)
))
val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("name",StringType,true),
StructField("features",features,true),
)
)
val records = sqlContext.read.schema(schema).json(filePath)
Poiché ogni richiesta ha una stringa di caratteristiche JSON. Ma questo mi dà errore.
root
|-- requestId: string (nullable = true)
|-- features: string (nullable = true)
|-- name: string (nullable = true)
Posso usare parallelizzare all'interno StructField durante la creazione dello schema:
Quando ho provato con
val records = sqlContext.jsonFile(filePath)
records.printSchema
mi mostra? Ho provato con:
I first tried with :
val customer = StructField("features",StringType,true)
val events = sc.parallelize(customer :: Nil)
val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("name", StructType(events, true),true),
StructField("features",features,true),
)
)
Questo mi dà errore pure. provato anche:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show
This gives me :
<console>:78: error: object liftweb is not a member of package net
import net.liftweb.json.parse
provato:
ho provato con:
val parseJson = udf((s: String) => {
sqlContext.read.json(s)
})
val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show
Ma ancora una volta l'errore.
provato:
import org.json4s._
import org.json4s.jackson.JsonMethods._
val parseJson = udf((s: String) => {
parse(s)
})
val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show
ma mi dà:
java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
Questo mi dà il corretto schema (in base alla risposta data da zero323:
val extractFeatures = udf((features: String) => Try {
implicit val formats = DefaultFormats
parse(features).extract[Features]
}.toOption)
val parsed = records.withColumn("features", extractFeatures($"features"))
parsed.printSchema
Ma quando interrogo:
val value = parsed.filter($"requestId" === "232323").select($"features.totalSpent")
value.show gives null
.
@ zero323: Ciao, ho attraversato l'altra domanda u rispose, ma non lo faccio abbastanza capisco. Puoi spiegarmi per favore? Sono molto nuovo a scintillare. – Swetha
Quale parte non è chiara? È una risposta lunga che mostra metodi diversi a seconda dei requisiti e della versione. Prendi in considerazione una soluzione particolare? – zero323
@ zero323 Ho provato ad usare il parallelismo sul mio schema perché utilizzo lo schema per ottenere altri valori, quindi preferisco usarlo nello schema ma dà errore. Sono propenso a UDF per analizzare JSON, in cui hai citato case class KV (k: String, v: Int) È a causa del modo in cui il suo json è formattato? – Swetha