2016-03-15 21 views
5

Sto chiamando una funzione in scala che fornisce un RDD[(Long,Long,Double)] come output.Unione di più RDD generati nel ciclo

def helperfunction(): RDD[(Long, Long, Double)]

io chiamo questa funzione in loop in un'altra parte del codice e voglio unire tutti i RDDs generati. Il ciclo chiamando la funzione simile a questa

for (i <- 1 to n){ 
    val tOp = helperfunction() 
    // merge the generated tOp 
} 

Quello che voglio fare è qualcosa di simile a ciò che StringBuilder farebbe per voi in Java quando si voleva unire le stringhe. Ho guardato le tecniche di RDDs fusione, che indicano per lo più utilizzando la funzione unione come questo

RDD1.union(RDD2) 

ma questo richiede sia RDDs da generare prima di prendere la loro unione. Mi sembra di inizializzare un var RDD1 per accumulare i risultati al di fuori del ciclo for, ma non sono sicuro di come posso inizializzare un RDD vuoto di tipo [(Long,Long,Double)]. Inoltre sto iniziando con la scintilla, quindi non sono nemmeno sicuro se questo è il metodo più elegante per risolvere questo problema.

risposta

4

Invece di usare Vars, è possibile utilizzare paradigmi di programmazione funzionale per ottenere ciò che si vuole:

val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _) 

Inoltre, se hai ancora bisogno di creare un RDD vuoto, si può fare usando:

val empty = sc.emptyRDD[(long, long, String)] 
+0

IIRC non è possibile unire un RDD a un RDD vuoto fino a Spark 2.0. – MrChristine

+0

come si fa se si deve passare l'indice del ciclo alla funzione helper? – G3M

+0

se si desidera passare l'indice del ciclo alla funzione helper, è possibile eseguire una delle seguenti operazioni: 'val rdd = (da 1 a n) .zipWithIndex.map {case (x, index) => helperFunction (i)} .reduce (_ union _) ' Naturalmente, in questo caso non è necessario in quanto abbiamo una raccolta incrementale intera ma è possibile sostituire' (1 a n) 'da qualsiasi raccolta –

2

Hai ragione che questo potrebbe non essere il modo ottimale per farlo, ma avremmo bisogno di più informazioni su ciò che stai cercando di realizzare con la generazione di un nuovo RDD con ogni chiamata alla tua funzione di supporto.

È possibile definire 1 RDD prima del ciclo e assegnarlo a una variabile, quindi eseguirlo nel ciclo. Ecco un esempio:

val rdd = sc.parallelize(1 to 100) 
val rdd_tuple = rdd.map(x => (x.toLong, (x*10).toLong, x.toDouble)) 
var new_rdd = rdd_tuple 
println("Initial RDD count: " + new_rdd.count()) 
for (i <- 2 to 4) { 
    new_rdd = new_rdd.union(rdd_tuple) 
} 
println("New count after loop: " + new_rdd.count()) 
+0

Qualsiasi corpo ha JavaCode per lo stesso scenario? – Neethu