2016-03-19 23 views
5

I file di output generati tramite il metodo Spark SQL DataFrame.write() iniziano con il prefisso "part". per esempio.Modifica prefisso nome file di output per DataFrame.write()

DataFrame sample_07 = hiveContext.table("sample_07"); 
sample_07.write().parquet("sample_07_parquet"); 

risultati in:

hdfs dfs -ls sample_07_parquet/                                        
Found 4 items 
-rw-r--r-- 1 rob rob   0 2016-03-19 16:40 sample_07_parquet/_SUCCESS 
-rw-r--r-- 1 rob rob  491 2016-03-19 16:40 sample_07_parquet/_common_metadata 
-rw-r--r-- 1 rob rob  1025 2016-03-19 16:40 sample_07_parquet/_metadata 
-rw-r--r-- 1 rob rob  17194 2016-03-19 16:40 sample_07_parquet/part-r-00000-cefb2ac6-9f44-4ce4-93d9-8e7de3f2cb92.gz.parquet 

vorrei cambiare il prefisso di uscita nome del file utilizzato per la creazione di un file utilizzando Spark SQL DataFrame.write(). Ho provato a impostare la proprietà "mapreduce.output.basename" sulla configurazione di hadoop per il contesto Spark. per esempio.

public class MyJavaSparkSQL { 

    public static void main(String[] args) throws Exception { 
    SparkConf sparkConf = new SparkConf().setAppName("MyJavaSparkSQL"); 
    JavaSparkContext ctx = new JavaSparkContext(sparkConf); 
    ctx.hadoopConfiguration().set("mapreduce.output.basename", "myprefix"); 
    HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(ctx.sc()); 
    DataFrame sample_07 = hiveContext.table("sample_07"); 
    sample_07.write().parquet("sample_07_parquet"); 
    ctx.stop(); 
    } 

che non cambiare il prefisso di uscita nome del file per i file generati.

Esiste un modo per sovrascrivere il prefisso del nome file di output quando si utilizza il metodo DataFrame.write()?

risposta

7

Non è possibile modificare il prefisso "part" mentre si utilizza uno dei formati di output standard (come Parquet). Vedere questo frammento da ParquetRelation source code:

private val recordWriter: RecordWriter[Void, InternalRow] = { 
    val outputFormat = { 
    new ParquetOutputFormat[InternalRow]() { 
     // ... 
     override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { 
     // .. 
     // prefix is hard-coded here: 
     new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension") 
    } 
    } 
} 

Se davvero necessario controllare i nomi dei file parte, probabilmente dovrete implementare un FileOutputFormat personalizzato e utilizzare uno dei Spark di salvare i metodi che accettano una classe FileOutputFormat (ad es saveAsHadoopFile) .

+0

Grazie per la risposta. Molto apprezzato. – Rob