2016-01-21 39 views
12

Sto scrivendo un processo ETL in cui sarà necessario leggere i file di registro orari, suddividere i dati e salvarli. Sto usando Spark (in Databricks). I file di registro sono CSV quindi li leggo e applico uno schema, quindi eseguo le mie trasformazioni.Aggiunta di nuovi dati a file di parquet partizionati

Il mio problema è, come posso salvare i dati di ogni ora come un formato parquet ma aggiungere il set di dati esistente? Al momento del salvataggio, ho bisogno di partizionare di 4 colonne presenti nel dataframe.

Ecco il mio Salva la riga:

data 
    .filter(validPartnerIds($"partnerID")) 
    .write 
    .partitionBy("partnerID","year","month","day") 
    .parquet(saveDestination) 

Il problema è che se esiste la cartella di destinazione del salvataggio genera un errore. Se la destinazione non esiste, non accedo ai miei file.

Ho provato a usare .mode("append") ma trovo che Spark a volte fallisca a metà, quindi finisco per perdere quanto dei miei dati sono scritti e quanto ho ancora bisogno di scrivere.

Sto usando parquet perché il partizionamento aumenta notevolmente le mie ricerche in futuro. Inoltre, devo scrivere i dati come un formato di file su disco e non posso usare un database come Druid o Cassandra.

Qualsiasi suggerimento su come partizionare il mio dataframe e salvare i file (incollandolo al parquet o in un altro formato) è molto apprezzato.

+1

Puoi condividere l'errore si ottiene quando si usa '.mode (append)'. –

+0

L'errore che ottengo è questo: causato da: java.io.IOException: il file esiste già:/tracking/v4/010316/gif = a/partnerID = 111/anno = 2016/mese = 1/giorno = 3/parte -r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet Penso che questo errore venga generato a causa di una mancata corrispondenza della pianificazione delle attività quando alcune operazioni di scrittura richiedono molto tempo. – Saman

risposta

9

Se è necessario aggiungere i file, è necessario utilizzare la modalità di aggiunta. Non so quante partizioni ci si aspetta che generi, ma trovo che se hai molte partizioni, partitionBy causerà un numero di problemi (problemi di memoria e IO allo stesso modo).

Se pensi che il tuo problema è causato da operazioni di scrittura impiega troppo tempo, vi consiglio di provare queste due cose:

1) Utilizzare scattanti con l'aggiunta alla configurazione:

conf.set("spark.sql.parquet.compression.codec", "snappy") 

2) disabilitare la generazione dei file di metadati nella hadoopConfiguration sul SparkContext come questo:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") 

i metadati-files saranno un po 'di tempo Consumi ng per generare (vedi this blog post), ma in base allo this non sono realmente importanti. Personalmente, li disabilito sempre e non ho problemi.

Se si genera molte partizioni (> 500), temo il meglio che posso fare è suggerire a voi che si guarda in una soluzione non utilizzando accodare-mode - ho semplicemente mai riuscito ad ottenere partitionBy al lavoro con quella molte partizioni.

+0

Grazie, Glennie. Disattivo sempre i file di metadati da essere generati esattamente a causa di quel post del blog: D. Sto sicuramente creando più di 500 partizioni. Credo che la maggior parte dei miei problemi derivino dal fatto che il formato parquet non è stato concepito per essere utilizzato come formato aggiornabile e lo sto trattando come una tabella di database. Hai qualche suggerimento per un altro modo per salvare i miei dati giornalieri? – Saman

+0

ho un problema simile, sto partizionando sulla base del timestamp corrente, con ogni nuova partizione aggiunta crea un task totale uguale alle partizioni finora. Ad esempio, se ci sono 1000 partizioni e 1 nuovo da aggiungere, verrà eseguito 1001 task e aumenterà il tempo complessivo del lavoro. sto facendo qualcosa di sbagliato qui? –

0

Se si utilizza il partizionamento non ordinato, i dati verranno suddivisi in tutte le partizioni. Ciò significa che ogni attività genererà e scriverà i dati su ciascuno dei tuoi file di output.

Considerate partizionamento dei dati in base alle vostre colonne di partizione prima di scrivere per avere tutti i dati per file di output sugli stessi partizioni:

data 
.filter(validPartnerIds($"partnerID")) 
.repartition([optional integer,] "partnerID","year","month","day") 
.write 
.partitionBy("partnerID","year","month","day") 
.parquet(saveDestination) 

See: DataFrame.repartition