2015-02-09 6 views
7

Ho implementato una soluzione al gruppo RDD[K, V] dalla chiave e per calcolare i dati in base a ciascun gruppo (K, RDD[V]), utilizzando partitionBy e Partitioner. Tuttavia, non sono sicuro se sia davvero efficiente e mi piacerebbe avere il tuo punto di vista.Utilizzando PartitionBy per dividere ed efficiente calcolare gruppi RDD con tasto

Ecco un caso di esempio: in base a un elenco di [K: Int, V: Int], calcolare la V s significano per ogni gruppo di K, sapendo che dovrebbe essere distribuito e che V valori può essere molto grande. Questo dovrebbe dare:

List[K, V] => (K, mean(V)) 

Il semplice classe di partizionamento:

class MyPartitioner(maxKey: Int) extends Partitioner { 

    def numPartitions = maxKey 

    def getPartition(key: Any): Int = key match { 
     case i: Int if i < maxKey => i 
    } 
    } 

Il codice di partizione:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7)) 

     val rdd = sc.parallelize(l) 
     val p = rdd.partitionBy(new MyPartitioner(4)).cache() 

     p.foreachPartition(x => { 
     try { 
      val r = sc.parallelize(x.toList) 
      val id = r.first() //get the K partition id 
      val v = r.map(x => x._2) 
      println(id._1 + "->" + mean(v)) 
     } catch { 
      case e: UnsupportedOperationException => 0 
     } 
     }) 

L'output è:

1->13, 2->4, 3->7

Le mie domande sono:

  1. cosa succede realmente quando si chiama partitionBy? (scusate, non ho trovato abbastanza specifiche su di esso)
  2. È davvero efficiente mappare per partizione, sapendo che nel mio caso di produzione non sarebbero troppe le chiavi (come 50 per il campione) di molti valori (come 1 milione per campione)
  3. Qual è il costo di paralellize(x.toList)? È coerente farlo? (Ho bisogno di un in input di mean())
  4. Come faresti da solo?

saluti

risposta

4

Il codice non dovrebbe funzionare. Non è possibile passare l'oggetto SparkContext agli executors. (Non è Serializable.) Inoltre non vedo perché dovresti.

Per calcolare la media, è necessario calcolare la somma e il conteggio e prendere il loro rapporto. Il partizionatore predefinito andrà bene.

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = { 
    case class SumCount(sum: Double, count: Double) 
    val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0), 
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count)) 
    sumCounts.map(sc => sc.sum/sc.count) 
} 

Questo è un calcolo a passaggio singolo efficiente che generalizza bene.

+0

grazie per la risposta, ovviamente non può funzionare, non ho tutti i riflessi dei trucchi per la codifica delle scintille e sono stato rovinato dal mio jvm locale. Tuttavia, in realtà non ho bisogno di calcolare la media, ma un complesso metodo ml, e ho bisogno di un RDD [Vettore]. Come posso ottenere un elenco di (chiave, RDD [Vettore]) da un RDD univoco [Int, Int]? Non ho trovato una soluzione. – Seb

+0

Penso che questo sia un argomento simile allora: http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302 Non sono sicuro di come si desidera rendere 'Vector's da 'Int's. Ma se si desidera ottenere un RDD per chiave, è necessario dividere l'RDD originale e questo è discusso nella risposta collegata. Se non ti dà la risposta, ti suggerisco di fare un'altra domanda, magari con una spiegazione chiara e di alto livello di ciò che vuoi fare. –