2015-04-03 17 views
7

Ho un dump di wikipedia compresso con bzip2 (scaricato da http://dumps.wikimedia.org/enwiki/), ma non voglio decomprimerlo: voglio elaborarlo durante la decompressione al volo.Ingresso compresso BZip2 per Apache Flink

So che è possibile farlo in Java semplice (vedere ad esempio Java - Read BZ2 file and uncompress/parse on the fly), ma mi chiedevo come farlo in Apache Flink? Quello di cui ho probabilmente bisogno è qualcosa come https://github.com/whym/wikihadoop ma per Flink, non Hadoop.

risposta

5

E 'possibile leggere i file compressi nei seguenti formati in Apache Flink:

org.apache.hadoop.io.compress.BZip2Codec 
org.apache.hadoop.io.compress.DefaultCodec 
org.apache.hadoop.io.compress.DeflateCodec 
org.apache.hadoop.io.compress.GzipCodec 
org.apache.hadoop.io.compress.Lz4Codec 
org.apache.hadoop.io.compress.SnappyCodec 

Come si può vedere dai nomi dei pacchetti, Flink fa questo usando InputFormats di Hadoop. Questo è un esempio per la lettura di file gz utilizzando API Scala di Flink: (è necessario almeno Flink 0.8.1)

def main(args: Array[String]) { 

    val env = ExecutionEnvironment.getExecutionEnvironment 
    val job = new JobConf() 
    val hadoopInput = new TextInputFormat() 
    FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz")) 
    val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job) 

    lines.print 

    env.execute("Read gz files") 
} 

Apache Flink ha solo costruire il supporto integrato per .deflate file. Aggiungere supporto per più codec di compressione è facile, ma non è ancora stato fatto.

L'utilizzo di HadoopInputFormats con Flink non causa alcuna perdita di prestazioni. Flink ha un supporto di serializzazione integrato per i tipi di Hadoop Writable.