2016-05-04 20 views
9

Sono uno studente di Apache Spark e ho trovato un RDD azione aggregate che non ho idea di come funzioni. Qualcuno può precisare e spiegare in dettaglio passo dopo passo come si è arrivati ​​al risultato qui sotto per il codice quiRDD Aggregate in spark

RDD input = {1,2,3,3} 

RDD Aggregate function : 

rdd.aggregate((0, 0)) 
((x, y) => 
(x._1 + y, x._2 + 1), 
(x, y) => 
(x._1 + y._1, x._2 + y._2)) 

output : {9,4} 

Grazie

risposta

18

Se non siete sicuri di ciò che sta accadendo è meglio seguire i tipi Tralasciando implicita ClassTag per brevità iniziamo con qualcosa di simile

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

Se si ignora tutti i parametri aggiuntivi vedrai che aggregate è una funzione che mappa RDD[T]-U. Significa che il tipo di valori nell'input RDD non deve essere uguale al tipo del valore di output. Quindi è chiaramente diverso rispetto ad esempio reduce:

def reduce(func: (T, T) ⇒ T): T 

o fold:

def fold(zeroValue: T)(op: (T, T) => T): T 

Lo stesso che fold, aggregate richiede un zeroValue. Come sceglierlo? Dovrebbe essere un elemento di identità (neutro) rispetto a combOp.

si hanno anche per fornire due funzioni:

  • seqOp che mappa (U, T)-U
  • combOp che mappa da (U, U) a U

Proprio sulla base di questo firme si dovrebbe già vedere che solo seqOp può accedere ai dati non elaborati. Richiede un valore di tipo U un altro di tipo T e restituisce un valore di tipo U. Nel tuo caso si tratta di una funzione con una successiva firma

((Int, Int), Int) => (Int, Int) 

A questo punto probabilmente sospetta che viene utilizzato per un certo tipo di operazione fold-like.

La seconda funzione accetta due argomenti di tipo U e restituisce un valore di tipo U. Come affermato in precedenza, dovrebbe essere chiaro che non tocca i dati originali e può operare solo sui valori già elaborati da seqOp. Nel tuo caso questa funzione ha una firma come segue:

((Int, Int), (Int, Int)) => (Int, Int) 

Quindi come possiamo ottenere tutto questo insieme?

  1. Prima ogni partizione viene aggregato utilizzando standard di Iterator.aggregate con zeroValue, seqOp e combOp passato come z, seqop e combop respectivelly.Poiché InterruptibleIterator utilizzati internamente non prevale aggregate va eseguito come un semplice foldLeft(zeroValue)(seqOp)

  2. Successiva risultati parziali raccolti da ciascuna partizione vengono aggregati utilizzando combOp

lascia supporre che l'input RDD ha tre partizioni con successivo distribuzione dei valori:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

Ci si può aspettare che l'esecuzione, ignorando ordine assoluto, sarà equivalente a qualcosa di simile:

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1) 
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2) 

Seq(Iterator(1, 2), Iterator(3, 3), Iterator()) 
    .map(_.foldLeft((0, 0))(seqOp)) 
    .reduce(combOp) 

foldLeft per una singola partizione può apparire così:

Iterator(1, 2).foldLeft((0, 0))(seqOp) 
Iterator(2).foldLeft((1, 1))(seqOp) 
(3, 2) 

e su tutte le partizioni

Seq((3,2), (6,2), (0,0)) 

che ha unito vi darà osservato risultato:

(3 + 6 + 0, 2 + 2 + 0) 
(9, 4) 

In generale si tratta di un modello comune, troverete tutto Spark in cui si passa valore neutro, una funzione utilizzata per elaborare i valori per partizione e una funzione usato per unire aggregati parziali da diverse partizioni. Alcuni altri esempi includono:

  • aggregateByKey
  • Definito dall'utente funzioni di aggregazione
  • Aggregators su Spark Datasets.
1

Qui è la mia comprensione per il vostro riferimento:

Immaginate di avere due nodi, uno prendere l'ingresso dei primi due elementi della lista {1,2}, e un altro prende {3, 3}. (La partizione qui solo per un comodo)

Al primo nodo: "(x, y) => (x._1 + y, x._2 + 1)", la prima x è (0 , 0) come dato, e y è il tuo primo elemento 1, e avrai output (0 + 1, 0 + 1), quindi il tuo secondo elemento y = 2, e output (1 + 2, 1 + 1), che è (3, 2)

Al secondo nodo, la stessa procedura avviene in parallelo e avrete (6, 2).

"(x, y) => (x._1 + y._1, x._2 + y._2)", ti dice di unire due nodi e otterrai (9,4)


una cosa vale la pena notare è (0,0) è in realtà aggiunto al risultato lunghezza (RDD) +1 volte.

"scala> rdd.aggregate ((1,1)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x. _1 + y._1, x._2 + y._2)) res1: (Int, Int) = (14,9) "