Sto provando a definire una UserDefinedAggregateFunction (UDAF) in Spark, che conta il numero di occorrenze per ogni valore univoco in una colonna di un gruppo.Perché la mappa mutevole diventa immutabile automaticamente in UserDefinedAggregateFunction (UDAF) in Spark
Questo è un esempio: Supponiamo che io sono un dataframe df
come questo,
+----+----+
|col1|col2|
+----+----+
| a| a1|
| a| a1|
| a| a2|
| b| b1|
| b| b2|
| b| b3|
| b| b1|
| b| b1|
+----+----+
avrò un DistinctValues UDAF
val func = new DistinctValues
Poi applicarlo al dataframe df
val agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV"))
Mi aspetto di avere qualcosa di simile e questo:
+----+--------------------------+
|col1|DV |
+----+--------------------------+
| a| Map(a1->2, a2->1) |
| b| Map(b1->3, b2->1, b3->1)|
+----+--------------------------+
Così mi è venuto fuori con un'UDAF come questo,
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.MapType
import org.apache.spark.sql.types.LongType
import Array._
class DistinctValues extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil)
def dataType: DataType = MapType(StringType, LongType)
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = scala.collection.mutable.Map()
}
def update(buffer: MutableAggregationBuffer, input: Row) : Unit = {
val str = input.getAs[String](0)
var mp = buffer.getAs[scala.collection.mutable.Map[String, Long]](0)
var c:Long = mp.getOrElse(str, 0)
c = c + 1
mp.put(str, c)
buffer(0) = mp
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = {
var mp1 = buffer1.getAs[scala.collection.mutable.Map[String, Long]](0)
var mp2 = buffer2.getAs[scala.collection.mutable.Map[String, Long]](0)
mp2 foreach {
case (k ,v) => {
var c:Long = mp1.getOrElse(k, 0)
c = c + v
mp1.put(k ,c)
}
}
buffer1(0) = mp1
}
def evaluate(buffer: Row): Any = {
buffer.getAs[scala.collection.mutable.Map[String, LongType]](0)
}
}
Poi ho questa funzione sul mio dataframe,
val func = new DistinctValues
val agg_values = df.groupBy("col1").agg(func(col("col2")).as("DV"))
Ha dato tale errore,
func: DistinctValues = [email protected]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 4 times, most recent failure: Lost task 1.3 in stage 32.0 (TID 884, ip-172-31-22-166.ec2.internal): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map
at $iwC$$iwC$DistinctValues.update(<console>:39)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.update(udaf.scala:431)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:187)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:180)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:116)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Sembra nello update(buffer: MutableAggregationBuffer, input: Row)
metodo, la variabile buffer
è un immutable.Map
, il programma stanco per gettarlo ai mutable.Map
,
ma ho usato mutable.Map
per inizializzare buffer
variabile nel initialize(buffer: MutableAggregationBuffer, input:Row)
metodo. È la stessa variabile passata al metodo update
? E anche buffer
è mutableAggregationBuffer
, quindi dovrebbe essere mutabile, giusto?
Perché la mia mappa mutabile è diventata immutabile? Qualcuno sa che cosa è successo?
Ho davvero bisogno di una mappa mutabile in questa funzione per completare l'operazione. So che c'è una soluzione alternativa per creare una mappa mutabile dalla mappa immutabile, quindi aggiornarla. Ma voglio davvero sapere perché il mutabile si trasforma in uno immutabile nel programma automaticamente, non ha senso per me.
Bella cattura! Hmm, il 'MapyType' in' StructType' può essere il caso. Ma non c'è altro tipo di mappa mutabile in 'spark.sql.types', a meno che non definisca il mio. –
Come ho detto, non farlo - basta usare una 'mappa' immutabile. 'mp = mp + (k -> c)' su una 'Mappa' immutabile ti dà la stessa funzionalità di' mp.put (k, c) 'su una' Mappa' mutabile –
'mp = mp + (k -> c)' funziona! Sono nuovo di scala, non sapevo che si potesse manipolare un tipo di dati immutabile come questo. Grazie mille! –