2015-05-11 8 views
11

In particolare, se dicoEsiste un "Spiega RDD" scintille

rdd3 = rdd1.join(rdd2) 

poi quando chiamo rdd3.collect, a seconda del Partitioner utilizzato, sia dati vengono spostati tra le partizioni nodi, o è fatto il join localmente su ogni partizione (o, per quel che ne so, qualcos'altro interamente). Dipende da ciò che la carta RDD chiama dipendenze "strette" e "larghe", ma chissà quanto è buono l'ottimizzatore nella pratica.

Ad ogni modo, posso in qualche modo ricavare dall'output di traccia quale cosa è realmente accaduta, ma sarebbe bello chiamare rdd3.explain.

Esiste una cosa del genere?

risposta

14

Penso che toDebugString placheranno la vostra curiosità.

scala> val data = sc.parallelize(List((1,2))) 
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:21 

scala> val joinedData = data join data 
joinedData: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[11] at join at <console>:23 

scala> joinedData.toDebugString 
res4: String = 
(8) MapPartitionsRDD[11] at join at <console>:23 [] 
| MapPartitionsRDD[10] at join at <console>:23 [] 
| CoGroupedRDD[9] at join at <console>:23 [] 
+-(8) ParallelCollectionRDD[8] at parallelize at <console>:21 [] 
+-(8) ParallelCollectionRDD[8] at parallelize at <console>:21 [] 

Ogni rientro è una fase, quindi questo dovrebbe funzionare come due stadi.

Inoltre, l'ottimizzatore è abbastanza decente, ma vi suggerirei di usare DataFrames se si utilizza 1.3+ come l'ottimizzatore non v'è ancora meglio, in molti casi :)

+2

Quella è una cosa di bellezza –

+0

Questo è utile, ma non è il livello di dettaglio che speravo. In particolare, un join che richiede/non richiede un shuffle da 'la stessa uscita a debugString ... anche se per cose come distinte posso chiaramente vedere il passo shuffle. –

+0

Penso che intendo "realizzato per eseguire locale", come in, non dovrebbero essere inviati dati tra i nodi se i partizionatori sono d'accordo. –