2015-02-04 1 views
28

Come posso interrogare un RDD con tipi complessi come mappe/matrici? per esempio, quando stavo scrivendo questo codice di prova:Interrogazione di Spark SQL DataFrame con tipi complessi

case class Test(name: String, map: Map[String, String]) 
val map = Map("hello" -> "world", "hey" -> "there") 
val map2 = Map("hello" -> "people", "hey" -> "you") 
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2))) 

ho anche se la sintassi sarebbe qualcosa di simile:

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world") 

o

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world") 

ma ho

Can't access nested field in type MapType(StringType,StringType,true)

a nd

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes

rispettivamente.

+1

ne dite di accettare che * * tomo di una risposta da @ zero323? – javadba

risposta

79

Dipende su un tipo di colonna.Iniziamo con alcuni dati dummy:

import org.apache.spark.sql.functions.{udf, lit} 
import scala.util.Try 

case class SubRecord(x: Int) 
case class ArrayElement(foo: String, bar: Int, vals: Array[Double]) 
case class Record(
    an_array: Array[Int], a_map: Map[String, String], 
    a_struct: SubRecord, an_array_of_structs: Array[ArrayElement]) 


val df = sc.parallelize(Seq(
    Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1), 
     Array(
      ArrayElement("foo", 1, Array(1.0, 2.0)), 
      ArrayElement("bar", 2, Array(3.0, 4.0)))), 
    Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2), 
     Array(ArrayElement("foz", 3, Array(5.0, 6.0)), 
       ArrayElement("baz", 4, Array(7.0, 8.0)))) 
)).toDF 
df.registerTempTable("df") 
df.printSchema 

// root 
// |-- an_array: array (nullable = true) 
// | |-- element: integer (containsNull = false) 
// |-- a_map: map (nullable = true) 
// | |-- key: string 
// | |-- value: string (valueContainsNull = true) 
// |-- a_struct: struct (nullable = true) 
// | |-- x: integer (nullable = false) 
// |-- an_array_of_structs: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- foo: string (nullable = true) 
// | | |-- bar: integer (nullable = false) 
// | | |-- vals: array (nullable = true) 
// | | | |-- element: double (containsNull = false) 
  • colonne di matrice:

    • Column.getItem metodo

      df.select($"an_array".getItem(1)).show 
      
      // +-----------+ 
      // |an_array[1]| 
      // +-----------+ 
      // |   2| 
      // |   5| 
      // +-----------+ 
      
    • alveare brac sintassi cati:

      sqlContext.sql("SELECT an_array[1] FROM df").show 
      
      // +---+ 
      // |_c0| 
      // +---+ 
      // | 2| 
      // | 5| 
      // +---+ 
      
    • un UDF

      val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption) 
      
      df.select(get_ith($"an_array", lit(1))).show 
      
      // +---------------+ 
      // |UDF(an_array,1)| 
      // +---------------+ 
      // |    2| 
      // |    5| 
      // +---------------+ 
      
  • mappa colonne

    • utilizzando Column.getField metodo:

      df.select($"a_map".getField("foo")).show 
      
      // +----------+ 
      // |a_map[foo]| 
      // +----------+ 
      // |  bar| 
      // |  null| 
      // +----------+ 
      
    • utilizzando Hive parentesi sintassi:

      sqlContext.sql("SELECT a_map['foz'] FROM df").show 
      
      // +----+ 
      // | _c0| 
      // +----+ 
      // |null| 
      // | baz| 
      // +----+ 
      
    • utilizzando un percorso completo con sintassi del punto:

      df.select($"a_map.foo").show 
      
      // +----+ 
      // | foo| 
      // +----+ 
      // | bar| 
      // |null| 
      // +----+ 
      
    • utilizzando un UDF

      val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k)) 
      
      df.select(get_field($"a_map", lit("foo"))).show 
      
      // +--------------+ 
      // |UDF(a_map,foo)| 
      // +--------------+ 
      // |   bar| 
      // |   null| 
      // +--------------+ 
      
  • colonne struct usando pieno path with dot syntax:

    • con API dataframe

      df.select($"a_struct.x").show 
      
      // +---+ 
      // | x| 
      // +---+ 
      // | 1| 
      // | 2| 
      // +---+ 
      
    • con SQL prime

      sqlContext.sql("SELECT a_struct.x FROM df").show 
      
      // +---+ 
      // | x| 
      // +---+ 
      // | 1| 
      // | 2| 
      // +---+ 
      
  • campi all'interno array di structs può accedere usando dot-sintassi, nomi e degli standard Column metodi :

    df.select($"an_array_of_structs.foo").show 
    
    // +----------+ 
    // |  foo| 
    // +----------+ 
    // |[foo, bar]| 
    // |[foz, baz]| 
    // +----------+ 
    
    sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show 
    
    // +---+ 
    // |_c0| 
    // +---+ 
    // |foo| 
    // |foz| 
    // +---+ 
    
    df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show 
    
    // +------------------------------+ 
    // |an_array_of_structs.vals[1][1]| 
    // +------------------------------+ 
    // |       4.0| 
    // |       8.0| 
    // +------------------------------+ 
    
  • È possibile accedere ai campi definiti dall'utente (UDT) utilizzando le UDF. Vedi SparkSQL referencing attributes of UDT per i dettagli.

Note:

  • seconda versione Spark alcuni di questi metodi può essere disponibile solo con HiveContext. Le UDF dovrebbero funzionare indipendentemente dalla versione con entrambi gli standard SQLContext e HiveContext.
  • valori nidificati in generale sono cittadini di seconda classe. Non tutte le operazioni tipiche sono supportate su campi nidificati. Secondo un contesto che potrebbe essere meglio per appiattire lo schema e/o esplodere collezioni

    df.select(explode($"an_array_of_structs")).show 
    
    // +--------------------+ 
    // |     col| 
    // +--------------------+ 
    // |[foo,1,WrappedArr...| 
    // |[bar,2,WrappedArr...| 
    // |[foz,3,WrappedArr...| 
    // |[baz,4,WrappedArr...| 
    // +--------------------+ 
    
  • sintassi del punto può essere combinato con carattere jolly (*) per selezionare (eventualmente multipla) campi senza specificare i nomi in modo esplicito:

    df.select($"a_struct.*").show 
    // +---+ 
    // | x| 
    // +---+ 
    // | 1| 
    // | 2| 
    // +---+ 
    
+5

puoi fornire qualche dettaglio? lol –

+0

È possibile recuperare tutti gli elementi in un array struct? È qualcosa di simile possibile .. sqlContext.sql ("SELECT an_array_of_structs [0] .foo FROM df"). Show – user1384205

+1

Questa dovrebbe essere la risposta accettata. –

2

Una volta si converte a DF, u può semplicemente recuperare i dati da

val rddRow= rdd.map(kv=>{ 
    val k = kv._1 
    val v = kv._2 
    Row(k, v) 
    }) 

val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true) 
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true) 
val arr = Array(myFld1, myFld2) 
val schema = StructType(arr) 
val rowrddDF = sqc.createDataFrame(rddRow, schema) 
rowrddDF.registerTempTable("rowtbl") 
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one")) 
or 
val rowrddDFFinal = rowrddDF.select("map.one") 
+0

quando provo questo ottengo 'errore: il valore _1 non è un membro di org.apache.spark.sql.Row' – Paul

0

qui era quello che ho fatto e ha funzionato

case class Test(name: String, m: Map[String, String]) 
val map = Map("hello" -> "world", "hey" -> "there") 
val map2 = Map("hello" -> "people", "hey" -> "you") 
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2))) 
val rdddf = rdd.toDF 
rdddf.registerTempTable("mytable") 
sqlContext.sql("select m.hello from mytable").show 

Risultati

+------+ 
| hello| 
+------+ 
| world| 
|people| 
+------+