Sto scrivendo un processo ETL in cui sarà necessario leggere i file di registro orari, suddividere i dati e salvarli. Sto usando Spark (in Databricks). I file di registro sono CSV quindi li leggo e applico uno schema, quindi eseguo le mie trasformazioni.Aggiunta di nuovi dati a file di parquet partizionati
Il mio problema è, come posso salvare i dati di ogni ora come un formato parquet ma aggiungere il set di dati esistente? Al momento del salvataggio, ho bisogno di partizionare di 4 colonne presenti nel dataframe.
Ecco il mio Salva la riga:
data
.filter(validPartnerIds($"partnerID"))
.write
.partitionBy("partnerID","year","month","day")
.parquet(saveDestination)
Il problema è che se esiste la cartella di destinazione del salvataggio genera un errore. Se la destinazione non esiste, non accedo ai miei file.
Ho provato a usare .mode("append")
ma trovo che Spark a volte fallisca a metà, quindi finisco per perdere quanto dei miei dati sono scritti e quanto ho ancora bisogno di scrivere.
Sto usando parquet perché il partizionamento aumenta notevolmente le mie ricerche in futuro. Inoltre, devo scrivere i dati come un formato di file su disco e non posso usare un database come Druid o Cassandra.
Qualsiasi suggerimento su come partizionare il mio dataframe e salvare i file (incollandolo al parquet o in un altro formato) è molto apprezzato.
Puoi condividere l'errore si ottiene quando si usa '.mode (append)'. –
L'errore che ottengo è questo: causato da: java.io.IOException: il file esiste già:/tracking/v4/010316/gif = a/partnerID = 111/anno = 2016/mese = 1/giorno = 3/parte -r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet Penso che questo errore venga generato a causa di una mancata corrispondenza della pianificazione delle attività quando alcune operazioni di scrittura richiedono molto tempo. – Saman