Ho implementato una soluzione al gruppo RDD[K, V]
dalla chiave e per calcolare i dati in base a ciascun gruppo (K, RDD[V])
, utilizzando partitionBy
e Partitioner
. Tuttavia, non sono sicuro se sia davvero efficiente e mi piacerebbe avere il tuo punto di vista.Utilizzando PartitionBy per dividere ed efficiente calcolare gruppi RDD con tasto
Ecco un caso di esempio: in base a un elenco di [K: Int, V: Int]
, calcolare la V
s significano per ogni gruppo di K
, sapendo che dovrebbe essere distribuito e che V
valori può essere molto grande. Questo dovrebbe dare:
List[K, V] => (K, mean(V))
Il semplice classe di partizionamento:
class MyPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
Il codice di partizione:
val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))
val rdd = sc.parallelize(l)
val p = rdd.partitionBy(new MyPartitioner(4)).cache()
p.foreachPartition(x => {
try {
val r = sc.parallelize(x.toList)
val id = r.first() //get the K partition id
val v = r.map(x => x._2)
println(id._1 + "->" + mean(v))
} catch {
case e: UnsupportedOperationException => 0
}
})
L'output è:
1->13, 2->4, 3->7
Le mie domande sono:
- cosa succede realmente quando si chiama
partitionBy
? (scusate, non ho trovato abbastanza specifiche su di esso) - È davvero efficiente mappare per partizione, sapendo che nel mio caso di produzione non sarebbero troppe le chiavi (come 50 per il campione) di molti valori (come 1 milione per campione)
- Qual è il costo di
paralellize(x.toList)
? È coerente farlo? (Ho bisogno di un in input dimean()
) - Come faresti da solo?
saluti
grazie per la risposta, ovviamente non può funzionare, non ho tutti i riflessi dei trucchi per la codifica delle scintille e sono stato rovinato dal mio jvm locale. Tuttavia, in realtà non ho bisogno di calcolare la media, ma un complesso metodo ml, e ho bisogno di un RDD [Vettore]. Come posso ottenere un elenco di (chiave, RDD [Vettore]) da un RDD univoco [Int, Int]? Non ho trovato una soluzione. – Seb
Penso che questo sia un argomento simile allora: http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302 Non sono sicuro di come si desidera rendere 'Vector's da 'Int's. Ma se si desidera ottenere un RDD per chiave, è necessario dividere l'RDD originale e questo è discusso nella risposta collegata. Se non ti dà la risposta, ti suggerisco di fare un'altra domanda, magari con una spiegazione chiara e di alto livello di ciò che vuoi fare. –