Sto cercando di scrivere un file parquet
su Amazon S3
utilizzando Spark 1.6.1
. Il piccolo parquet
che sto generando è ~2GB
una volta scritto quindi non si tratta di molti dati. Sto cercando di dimostrare Spark
come piattaforma che posso usare.L'utilizzo di Spark per scrivere un file parquet su s3 su s3a è molto lento
Fondamentalmente quello che sto andando è impostare un star schema
con dataframes
, quindi ho intenzione di scrivere quei tavoli sul parquet. I dati provengono da file CSV forniti da un fornitore e sto utilizzando Spark come piattaforma ETL
. Attualmente ho un cluster a 3 nodi in ec2(r3.2xlarge)
Quindi 120GB
di memoria sugli esecutori e 16 core totali.
I file di input ammontano a circa 22 GB e per il momento sto estraendo circa 2 GB di tali dati. Alla fine questo sarà di molti terabyte quando comincio a caricare l'intero set di dati.
Qui è la mia scintilla/Scala pseudocode
:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6)))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
Il conteggio dura circa 2 minuti per 465884512 righe. La scrittura per parquet prende 38 minuti
Capisco che il coalesce
fa un riordino al driver che fa la scrittura .... ma la quantità di tempo che sta prendendo mi sta facendo pensare che sto facendo qualcosa di seriamente sbagliato . Senza lo coalesce
, ci vogliono ancora 15 minuti, il quale IMO è ancora troppo lungo e mi dà un sacco di piccoli file parquet
. Mi piacerebbe avere un grande file al giorno di dati che avrò. Ho il codice per fare il partizionamento da un valore di campo, ed è altrettanto lento. Ho anche provato a trasmettere questo a csv
e che richiede ~ 1 ora.
Inoltre, in realtà non sto impostando i puntelli durante la presentazione del mio lavoro. Le mie statistiche di console per un posto di lavoro sono:
- Alive Addetti: 2
- core in uso: 16 Totale, 16 Usato
- Memoria in uso: 117,5 GB totali, 107,5 GB Usato
- Applicazioni: 1 Correre, 5 Completato
- Driver: 0 corsa, 0 Completato
- Stato: ALIVE
una coalesce non mescola il driver che mescola tra gli esecutori, ma questo è irrilevante per il problema che si sta vedendo. Stai usando EMR? se è così usa s3: // e non s3a: //. in entrambi i casi su Spark 1.6 dovresti usare Direct OutputCommitter come dice @David. Un altro possibile miglioramento consiste nell'impostare parquet.enable.summary-metadata su false –
L'utilizzo di Alluxio di fronte a S3 lo velocizza? –