E 'possibile leggere i file compressi nei seguenti formati in Apache Flink:
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
Come si può vedere dai nomi dei pacchetti, Flink fa questo usando InputFormats di Hadoop. Questo è un esempio per la lettura di file gz utilizzando API Scala di Flink: (è necessario almeno Flink 0.8.1)
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val job = new JobConf()
val hadoopInput = new TextInputFormat()
FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
lines.print
env.execute("Read gz files")
}
Apache Flink ha solo costruire il supporto integrato per .deflate file. Aggiungere supporto per più codec di compressione è facile, ma non è ancora stato fatto.
L'utilizzo di HadoopInputFormats con Flink non causa alcuna perdita di prestazioni. Flink ha un supporto di serializzazione integrato per i tipi di Hadoop Writable
.
fonte
2015-04-03 11:49:20