Se non siete sicuri di ciò che sta accadendo è meglio seguire i tipi Tralasciando implicita ClassTag
per brevità iniziamo con qualcosa di simile
abstract class RDD[T] extends Serializable with Logging
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
Se si ignora tutti i parametri aggiuntivi vedrai che aggregate
è una funzione che mappa RDD[T]
-U
. Significa che il tipo di valori nell'input RDD
non deve essere uguale al tipo del valore di output. Quindi è chiaramente diverso rispetto ad esempio reduce
:
def reduce(func: (T, T) ⇒ T): T
o fold
:
def fold(zeroValue: T)(op: (T, T) => T): T
Lo stesso che fold
, aggregate
richiede un zeroValue
. Come sceglierlo? Dovrebbe essere un elemento di identità (neutro) rispetto a combOp
.
si hanno anche per fornire due funzioni:
seqOp
che mappa (U, T)
-U
combOp
che mappa da (U, U)
a U
Proprio sulla base di questo firme si dovrebbe già vedere che solo seqOp
può accedere ai dati non elaborati. Richiede un valore di tipo U
un altro di tipo T
e restituisce un valore di tipo U
. Nel tuo caso si tratta di una funzione con una successiva firma
((Int, Int), Int) => (Int, Int)
A questo punto probabilmente sospetta che viene utilizzato per un certo tipo di operazione fold-like.
La seconda funzione accetta due argomenti di tipo U
e restituisce un valore di tipo U
. Come affermato in precedenza, dovrebbe essere chiaro che non tocca i dati originali e può operare solo sui valori già elaborati da seqOp
. Nel tuo caso questa funzione ha una firma come segue:
((Int, Int), (Int, Int)) => (Int, Int)
Quindi come possiamo ottenere tutto questo insieme?
Prima ogni partizione viene aggregato utilizzando standard di Iterator.aggregate
con zeroValue
, seqOp
e combOp
passato come z
, seqop
e combop
respectivelly.Poiché InterruptibleIterator
utilizzati internamente non prevale aggregate
va eseguito come un semplice foldLeft(zeroValue)(seqOp)
Successiva risultati parziali raccolti da ciascuna partizione vengono aggregati utilizzando combOp
lascia supporre che l'input RDD ha tre partizioni con successivo distribuzione dei valori:
Iterator(1, 2)
Iterator(2, 3)
Iterator()
Ci si può aspettare che l'esecuzione, ignorando ordine assoluto, sarà equivalente a qualcosa di simile:
val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
.map(_.foldLeft((0, 0))(seqOp))
.reduce(combOp)
foldLeft
per una singola partizione può apparire così:
Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)
e su tutte le partizioni
Seq((3,2), (6,2), (0,0))
che ha unito vi darà osservato risultato:
(3 + 6 + 0, 2 + 2 + 0)
(9, 4)
In generale si tratta di un modello comune, troverete tutto Spark in cui si passa valore neutro, una funzione utilizzata per elaborare i valori per partizione e una funzione usato per unire aggregati parziali da diverse partizioni. Alcuni altri esempi includono:
aggregateByKey
- Definito dall'utente funzioni di aggregazione
Aggregators
su Spark Datasets
.