2015-01-24 16 views
11

ho lanciato il mio gruppo in questo modo:Spark: la strategia Repartition dopo la lettura di file di testo

/usr/lib/spark/bin/spark-submit --class MyClass --master yarn-cluster--num-executors 3 --driver-memory 10g --executor-memory 10g --executor-cores 4 /path/to/jar.jar 

La prima cosa che faccio è leggere un file di testo grande, e ritengo che:

val file = sc.textFile("/path/to/file.txt.gz") 
println(file.count()) 

Quando facendo ciò, vedo che solo uno dei miei nodi sta effettivamente leggendo il file ed eseguendo il conteggio (perché vedo solo un'attività). È previsto? Dovrei ripartizionare il mio RDD in seguito, o quando uso le funzioni di riduzione della mappa, Spark lo farà per me?

+0

Quali sono le tue "defaultMinPartitions"? Come dice chiaramente il documento, textFile accetta un numero facoltativo di parametro partizioni, che per impostazione predefinita è quello. –

+0

My defaultMinPartitions è maggiore di uno. Sembra che non sia possibile forzare un numero specificato di partizioni, perché è solo un file di testo ... in esecuzione .... val file = sc.textFile ("/ percorso/to/file.txt.gz", 8) println (file.partitions.length) restituisce 1 – Stephane

+0

Bene, deve fare la lettura in un posto, perché è intrinsecamente seriale. Ma non riesco a capire perché avrebbe avuto quel parametro opzionale se non avesse fatto _qualcosa_. –

risposta

20

Sembra che tu stia lavorando con un file gzip.

Citando my answer here:

Credo che hai raggiunto un problema abbastanza tipico con i file compressi con gzip, nel senso che non possono essere caricati in parallelo. Più precisamente, , un singolo file gzip non può essere caricato in parallelo da più attività, quindi Spark lo caricherà con 1 attività e quindi fornirà un RDD con 1 partizione.

È necessario ripartizionare esplicitamente l'RDD dopo averlo caricato in modo che più attività possano essere eseguite parallelamente.

Ad esempio:

val file = sc.textFile("/path/to/file.txt.gz").repartition(sc.defaultParallelism * 3) 
println(file.count()) 

Per quanto riguarda i commenti sul tuo domanda, l'impostazione minPartitions non aiuta qui motivo è perché a gzipped file is not splittable, quindi Spark utilizzerà sempre 1 compito di leggere il file.

Se si imposta minPartitions durante la lettura di un normale file di testo o un file compresso con un formato di compressione splittabile come bzip2, si vedrà che Spark distribuirà effettivamente quel numero di attività in parallelo (fino al numero di core disponibili nel tuo cluster) per leggere il file.

+0

Grazie! Consiglieresti bzip2 su gzip allora? Se carico frequentemente quel file, quale dovrebbe essere la mia strategia per ottimizzare ogni esecuzione? – Stephane

+0

@Stephane - Dipende dalla quantità di dati in arrivo e dal tempo trascorso dal ripartizionamento dei dati da parte del cluster. Un singolo file gzip potrebbe andare bene. Se l'unico file è troppo grande, è probabile che si disponga anche di più file compressi con gzip (vale a dire la divisione prima della compressione) poiché ogni file gzip può essere caricato in parallelo nello stesso RDD (un compito per file). Questo è probabilmente il percorso di minor resistenza. –

+0

molto molto interessante, grazie! Quindi .gz.001 file divisi o bzip2 ... Sperimenterò con entrambi!Penso che sì, il grande collo di bottiglia è la prima ripartizione, quindi se riesco a dividere i miei file quando arrivano potrebbe risparmiare un po 'di tempo – Stephane