2016-05-29 12 views
5

Ho un'applicazione con circa 10 file flat ciascuno del valore di oltre 200 MM + record in essi. La logica aziendale implica l'unione di tutti loro in modo sequenziale.Spark RDD - evitare shuffle - Il partizionamento aiuta a elaborare file enormi?

mio ambiente: 1 master - 3 slave (per i test ho assegnato un 1GB di memoria per ciascun nodo)

La maggior parte del codice appena fa il seguito per ogni join

RDD1 = sc.textFile(file1).mapToPair(..) 

RDD2 = sc.textFile(file2).mapToPair(..) 

join = RDD1.join(RDD2).map(peopleObject) 

Ogni suggerimento per la messa a punto, come ripartizionare, parallelizzare ..? In caso affermativo, le migliori pratiche in venire con un buon numero per il ripartizionamento?

con la configurazione corrente del lavoro prende più di un'ora e vedo la scrittura casuale per quasi tutti i file è> 3 GB

+0

file memorizzati su HDFS? quante partizioni hai? – marios

+0

n. Sono in aws s3 e non hanno ancora eseguito alcuna partizione, ma potrebbero esserci scintille interne che utilizzano il parallelismo predefinito. – sve

+0

Puoi fare RDD1.partitions.size o eseguire "RDD1.toDebugString" e vedere qual è il numero di partizioni che hai? – marios

risposta

2

Se siamo sempre associarsi ad uno RDD (diciamo rdd1) con tutti gli altri, l'idea è partizionare quel RDD e poi persisterlo.

Ecco implementazione sudo-Scala (può essere facilmente convertito in Python o Java):

val rdd1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(200)).cache() 

Fino a qui abbiamo rdd1 da hash in 200 partizioni. La prima volta che verrà valutata verrà mantenuta (memorizzata nella cache).

Ora leggiamo altri due rdds e li uniamo.

val rdd2 = sc.textFile(file2).mapToPair(..) 
val join1 = rdd1.join(rdd2).map(peopleObject) 
val rdd3 = sc.textFile(file3).mapToPair(..) 
val join2 = rdd1.join(rdd3).map(peopleObject) 

Si noti che per gli RDD di ricontrazione non li partizioniamo e non li memorizziamo nella cache.

Spark vedrà che rdd1 è già una partizione con hash e utilizzerà le stesse partizioni per tutti i join rimanenti. Quindi rdd2 e rdd3 mescoleranno le loro chiavi alle stesse posizioni in cui si trovano le chiavi di rdd1.

Per rendere più chiaro, supponiamo di non eseguire la partizione e di utilizzare lo stesso codice mostrato dalla domanda; Ogni volta che facciamo un join entrambi i dischi verranno mescolati. Ciò significa che se abbiamo N join per rdd1, la versione non partizionata rimescola rdd1 N volte. L'approccio partizionato riprodurrà rdd1 una sola volta.

+0

Cosa guadagniamo memorizzando nella cache il primo RDD? – axiom

+0

Quando tutte le sue chiavi troveranno la loro casa, rimarranno lì fino a quando non avrai finito con tutte le tue entrate. – marios

+0

'rdd1' sarà materializzato una volta, quando viene chiamato il primo join. D'ora in poi verrà memorizzato nella cache, ma non verrà successivamente utilizzato (come da codice fornito dall'OP). Non abbiamo bisogno di 'rdd1' fino a quando il join è finito. Vedo che hai presentato un caso d'uso leggermente diverso. OP voleva rdd1.join (rdd2) .... join (rddN) IMO. Il caching è indubbiamente utile con il codice presentato nella risposta. – axiom

3

In pratica, con dataset di grandi dimensioni (5, 100G + ciascuno), ho visto che il join funziona meglio quando si co-partizionano tutti gli RDD coinvolti in una serie di join prima di iniziare a unirli.

RDD1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(2048)) 

RDD2 = sc.textFile(file2).mapToPair(..).partitionBy(new HashPartitioner(2048)) 
. 
. 
. 
RDDN = sc.textFile(fileN).mapToPair(..).partitionBy(new HashPartitioner(2048)) 

//start joins 

RDD1.join(RDD2)...join(RDDN)


Nota a margine: Mi riferisco a questo tipo di aderire come un albero di join (ogni RDD un tempo utilizzato). La logica è presentato nella seguente bella foto tratte da Spark-UI:

enter image description here

+0

/@ mario - Grazie per la breve spiegazione. Infatti ho entrambi i casi d'uso in sequenza unire RDD1 con gli altri RDD così come unire RDDn1, RDDn2 e i risultati con RDD1. Osservando gli esempi forniti, la mia comprensione è che, le prestazioni sono migliori quando partiziono tutti gli RDD e memorizzo nella cache l'RDD primario. fammi sapere se ho capito bene – sve

+0

@SpringStarter Si prega di notare che nel caso presentato sopra, il caching sta andando davvero male, dato che stai sprecando spazio su qualcosa che non è necessario. Tuttavia, per l'altro caso d'uso che hai menzionato, il caching sarebbe davvero d'aiuto. – axiom