Sto chiamando una funzione in scala che fornisce un RDD[(Long,Long,Double)]
come output.Unione di più RDD generati nel ciclo
def helperfunction(): RDD[(Long, Long, Double)]
io chiamo questa funzione in loop in un'altra parte del codice e voglio unire tutti i RDDs generati. Il ciclo chiamando la funzione simile a questa
for (i <- 1 to n){
val tOp = helperfunction()
// merge the generated tOp
}
Quello che voglio fare è qualcosa di simile a ciò che StringBuilder farebbe per voi in Java quando si voleva unire le stringhe. Ho guardato le tecniche di RDDs fusione, che indicano per lo più utilizzando la funzione unione come questo
RDD1.union(RDD2)
ma questo richiede sia RDDs da generare prima di prendere la loro unione. Mi sembra di inizializzare un var RDD1 per accumulare i risultati al di fuori del ciclo for, ma non sono sicuro di come posso inizializzare un RDD vuoto di tipo [(Long,Long,Double)]
. Inoltre sto iniziando con la scintilla, quindi non sono nemmeno sicuro se questo è il metodo più elegante per risolvere questo problema.
IIRC non è possibile unire un RDD a un RDD vuoto fino a Spark 2.0. – MrChristine
come si fa se si deve passare l'indice del ciclo alla funzione helper? – G3M
se si desidera passare l'indice del ciclo alla funzione helper, è possibile eseguire una delle seguenti operazioni: 'val rdd = (da 1 a n) .zipWithIndex.map {case (x, index) => helperFunction (i)} .reduce (_ union _) ' Naturalmente, in questo caso non è necessario in quanto abbiamo una raccolta incrementale intera ma è possibile sostituire' (1 a n) 'da qualsiasi raccolta –