2016-04-14 14 views
5

Sto provando a definire una UserDefinedAggregateFunction (UDAF) in Spark, che conta il numero di occorrenze per ogni valore univoco in una colonna di un gruppo.Perché la mappa mutevole diventa immutabile automaticamente in UserDefinedAggregateFunction (UDAF) in Spark

Questo è un esempio: Supponiamo che io sono un dataframe df come questo,

+----+----+ 
|col1|col2| 
+----+----+ 
| a| a1| 
| a| a1| 
| a| a2| 
| b| b1| 
| b| b2| 
| b| b3| 
| b| b1| 
| b| b1| 
+----+----+ 

avrò un DistinctValues ​​UDAF

val func = new DistinctValues 

Poi applicarlo al dataframe df

val agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV")) 

Mi aspetto di avere qualcosa di simile e questo:

+----+--------------------------+ 
|col1|DV      | 
+----+--------------------------+ 
| a| Map(a1->2, a2->1)  | 
| b| Map(b1->3, b2->1, b3->1)| 
+----+--------------------------+ 

Così mi è venuto fuori con un'UDAF come questo,

import org.apache.spark.sql.expressions.MutableAggregationBuffer 
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.types.StructField 
import org.apache.spark.sql.types.DataType 
import org.apache.spark.sql.types.ArrayType 
import org.apache.spark.sql.types.StringType 
import org.apache.spark.sql.types.MapType 
import org.apache.spark.sql.types.LongType 
import Array._ 

class DistinctValues extends UserDefinedAggregateFunction { 
    def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil) 

    def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil) 

    def dataType: DataType = MapType(StringType, LongType) 
    def deterministic: Boolean = true 

    def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = scala.collection.mutable.Map() 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) : Unit = { 
    val str = input.getAs[String](0) 
    var mp = buffer.getAs[scala.collection.mutable.Map[String, Long]](0) 
    var c:Long = mp.getOrElse(str, 0) 
    c = c + 1 
    mp.put(str, c) 
    buffer(0) = mp 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = { 
    var mp1 = buffer1.getAs[scala.collection.mutable.Map[String, Long]](0) 
    var mp2 = buffer2.getAs[scala.collection.mutable.Map[String, Long]](0) 
    mp2 foreach { 
     case (k ,v) => { 
      var c:Long = mp1.getOrElse(k, 0) 
      c = c + v 
      mp1.put(k ,c) 
     } 
    } 
    buffer1(0) = mp1 
    } 

    def evaluate(buffer: Row): Any = { 
     buffer.getAs[scala.collection.mutable.Map[String, LongType]](0) 
    } 
} 

Poi ho questa funzione sul mio dataframe,

val func = new DistinctValues 
val agg_values = df.groupBy("col1").agg(func(col("col2")).as("DV")) 

Ha dato tale errore,

func: DistinctValues = [email protected] 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 4 times, most recent failure: Lost task 1.3 in stage 32.0 (TID 884, ip-172-31-22-166.ec2.internal): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map 
at $iwC$$iwC$DistinctValues.update(<console>:39) 
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.update(udaf.scala:431) 
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:187) 
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:180) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:116) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

Sembra nello update(buffer: MutableAggregationBuffer, input: Row) metodo, la variabile buffer è un immutable.Map, il programma stanco per gettarlo ai mutable.Map,

ma ho usato mutable.Map per inizializzare buffer variabile nel initialize(buffer: MutableAggregationBuffer, input:Row) metodo. È la stessa variabile passata al metodo update? E anche buffer è mutableAggregationBuffer, quindi dovrebbe essere mutabile, giusto?

Perché la mia mappa mutabile è diventata immutabile? Qualcuno sa che cosa è successo?

Ho davvero bisogno di una mappa mutabile in questa funzione per completare l'operazione. So che c'è una soluzione alternativa per creare una mappa mutabile dalla mappa immutabile, quindi aggiornarla. Ma voglio davvero sapere perché il mutabile si trasforma in uno immutabile nel programma automaticamente, non ha senso per me.

risposta

4

Credete che sia il MapType nel vostro StructType. buffer detiene pertanto un valore Map, che sarebbe immutabile.

È possibile convertire, ma perché non lasciarlo immutabile e fare questo:

mp = mp + (k -> c) 

per aggiungere una voce alla immutabili Map? esempio

Lavorare sotto:

class DistinctValues extends UserDefinedAggregateFunction { 
    def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("_2", IntegerType) :: Nil) 

    def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil) 

    def dataType: DataType = MapType(StringType, LongType) 
    def deterministic: Boolean = true 

    def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = Map() 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) : Unit = { 
    val str = input.getAs[String](0) 
    var mp = buffer.getAs[Map[String, Long]](0) 
    var c:Long = mp.getOrElse(str, 0) 
    c = c + 1 
    mp = mp + (str -> c) 
    buffer(0) = mp 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = { 
    var mp1 = buffer1.getAs[Map[String, Long]](0) 
    var mp2 = buffer2.getAs[Map[String, Long]](0) 
    mp2 foreach { 
     case (k ,v) => { 
      var c:Long = mp1.getOrElse(k, 0) 
      c = c + v 
      mp1 = mp1 + (k -> c) 
     } 
    } 
    buffer1(0) = mp1 
    } 

    def evaluate(buffer: Row): Any = { 
     buffer.getAs[Map[String, LongType]](0) 
    } 
} 
+0

Bella cattura! Hmm, il 'MapyType' in' StructType' può essere il caso. Ma non c'è altro tipo di mappa mutabile in 'spark.sql.types', a meno che non definisca il mio. –

+0

Come ho detto, non farlo - basta usare una 'mappa' immutabile. 'mp = mp + (k -> c)' su una 'Mappa' immutabile ti dà la stessa funzionalità di' mp.put (k, c) 'su una' Mappa' mutabile –

+0

'mp = mp + (k -> c)' funziona! Sono nuovo di scala, non sapevo che si potesse manipolare un tipo di dati immutabile come questo. Grazie mille! –