2016-04-29 32 views
13

Sto cercando di scrivere un file parquet su Amazon S3 utilizzando Spark 1.6.1. Il piccolo parquet che sto generando è ~2GB una volta scritto quindi non si tratta di molti dati. Sto cercando di dimostrare Spark come piattaforma che posso usare.L'utilizzo di Spark per scrivere un file parquet su s3 su s3a è molto lento

Fondamentalmente quello che sto andando è impostare un star schema con dataframes, quindi ho intenzione di scrivere quei tavoli sul parquet. I dati provengono da file CSV forniti da un fornitore e sto utilizzando Spark come piattaforma ETL. Attualmente ho un cluster a 3 nodi in ec2(r3.2xlarge) Quindi 120GB di memoria sugli esecutori e 16 core totali.

I file di input ammontano a circa 22 GB e per il momento sto estraendo circa 2 GB di tali dati. Alla fine questo sarà di molti terabyte quando comincio a caricare l'intero set di dati.

Qui è la mia scintilla/Scala pseudocode:

def loadStage(): Unit = { 
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData") 
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter") 
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false") 
    var sqlCtx = new SQLContext(sc) 


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz") 

    //Setup header table/df 
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1") 
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....." 
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false))) 
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6))) 
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema) 
    header.registerTempTable("header") 
    sqlCtx.cacheTable("header") 


    //Setup fact table/df 
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2") 
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....." 
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false))) 
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10))) 
    val df = sqlCtx.createDataFrame(records, factSchema) 
    df.registerTempTable("fact") 

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date") 


    println(results.count()) 



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet") 


    } 

Il conteggio dura circa 2 minuti per 465884512 righe. La scrittura per parquet prende 38 minuti

Capisco che il coalesce fa un riordino al driver che fa la scrittura .... ma la quantità di tempo che sta prendendo mi sta facendo pensare che sto facendo qualcosa di seriamente sbagliato . Senza lo coalesce, ci vogliono ancora 15 minuti, il quale IMO è ancora troppo lungo e mi dà un sacco di piccoli file parquet. Mi piacerebbe avere un grande file al giorno di dati che avrò. Ho il codice per fare il partizionamento da un valore di campo, ed è altrettanto lento. Ho anche provato a trasmettere questo a csv e che richiede ~ 1 ora.

Inoltre, in realtà non sto impostando i puntelli durante la presentazione del mio lavoro. Le mie statistiche di console per un posto di lavoro sono:

  • Alive Addetti: 2
  • core in uso: 16 Totale, 16 Usato
  • Memoria in uso: 117,5 GB totali, 107,5 GB Usato
  • Applicazioni: 1 Correre, 5 Completato
  • Driver: 0 corsa, 0 Completato
  • Stato: ALIVE
+1

una coalesce non mescola il driver che mescola tra gli esecutori, ma questo è irrilevante per il problema che si sta vedendo. Stai usando EMR? se è così usa s3: // e non s3a: //. in entrambi i casi su Spark 1.6 dovresti usare Direct OutputCommitter come dice @David. Un altro possibile miglioramento consiste nell'impostare parquet.enable.summary-metadata su false –

+0

L'utilizzo di Alluxio di fronte a S3 lo velocizza? –

risposta

14

Spark defaul Ciò causa una grande quantità di sovraccarico (probabilmente) non necessario durante le operazioni di I/O, specialmente quando si scrive su S3. This article discute questo in modo più approfondito, ma ci sono 2 impostazioni che vorrai considerare di cambiare.

  • Utilizzo di DirectParquetOutputCommitter. Per impostazione predefinita, Spark salverà tutti i dati in una cartella temporanea, quindi li sposterà in seguito.Utilizzando il DirectParquetOutputCommitter farà risparmiare tempo direttamente scrivendo al percorso di uscita S3

    • No longer available in Spark 2.0+
      • Come indicato nel biglietto jira, la soluzione attuale è quella di
        1. Passare il codice per usando s3a e Hadoop 2.7.2+; è meglio tutto, va meglio in Hadoop 2.8, ed è la base per s3guard
        2. Utilizzare il Hadoop FileOutputCommitter e impostare mapreduce.fileoutputcommitter.algorithm.version a 2
  • Disattiva fusione schema. Se la fusione dello schema è attiva, il nodo del driver eseguirà la scansione di tutti i file per garantire uno schema coerente. Questo è particolarmente costoso perché non è un'operazione distribuita. Assicurarsi che questo sia spento facendo

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

+2

a partire da Spark 2.0 DirectParquetOutputCommitter non è più disponibile. vedi [SPARK-10063] (https://issues.apache.org/jira/browse/SPARK-10063) per la nuova soluzione –

+0

@TalJoffe hai provato la loro soluzione? Se sì, come ha funzionato? E puoi rispondere con come? – David

+0

L'ho provato funziona abbastanza bene. Ho fatto un piccolo test su una cartella da 30g e le prestazioni erano praticamente le stesse –

3

Il committer uscita diretta è andato dal codice base scintilla; devi scrivere il tuo/resuscitare il codice cancellato nel tuo JAR. Se lo fai, disattiva la speculazione nel tuo lavoro e sappi che anche altri errori possono causare problemi, laddove il problema è "dati non validi".

Su una nota più brillante, Hadoop 2.8 aggiungerà alcuni Speedup S3A specificamente per la lettura di formati binari ottimizzati (ORC, Parquet) off S3; vedi HADOOP-11694 per dettagli. E alcune persone stanno lavorando su Amazon Dynamo per l'archivio di metadati coerente che dovrebbe essere in grado di eseguire un commit O (1) robusto alla fine del lavoro.