2016-02-15 33 views
6

La mia comprensione del metodo fileStream() di Spark è che ci vogliono tre tipi come parametri - K ey, V alore e F ormat. Nel caso di file di testo, i tipi appropriati sono LongWritable, Text e TextInputFormat. Voglio innanzitutto capire la natura di questi tipi. Intuitivamente, direi che lo Key in questo caso è il numero di riga del file e il Value è il testo su quella riga. Quindi, nel seguente esempio di un file di testo:Come leggo i file parquet usando `ssc.fileStream()`, e qual è la natura dei tipi passati a `ssc.fileStream()`

Hello 
Test 
Another Test 

La prima riga del DSTREAM avrebbe un Key di 1 (0?) E una Value di Hello.

È corretto?

La parte successiva della mia domanda: ho guardato l'attuazione decompilato di ParquetInputFormat e ho notato qualcosa di curioso:

enter image description here

enter image description here

TextInputFormat estende FileInputFormat di tipi LongWritable e Text, mentre ParquetInputFormat estende la stessa classe di tipi Void e T.

Ciò significa che devo creare una classe Value per contenere un'intera riga dei miei dati parquet e quindi passare i tipi [Void, MyClass, ParquetInputFormat[MyClass]] a ssc.fileStream()?

Se sì, come devo implementare MyClass?

Qualsiasi altra guida è molto gradita.

MODIFICA: Ho notato un readSupportClass che deve essere passato agli oggetti ParquetInputFormat. Che tipo di classe è questa e come viene utilizzata per analizzare il file del parquet? È qualcosa che dovrei sapere e capire?

Come una parte - c'è qualche documentazione che copre questo? Non sono riuscito a trovarne.

MODIFICA 2: per quanto posso dire, questo è impossibile. Se qualcuno sa come trasmettere file in parquet a Spark, per favore sentiti libero di condividerlo.

risposta

5

Il mio esempio per leggere i file parquet in Spark Streaming è riportato di seguito.

val ssc = new StreamingContext(sparkConf, Seconds(2)) 
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport") 
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
    directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration) 

val lines = stream.map(row => { 
    println("row:" + row.toString()) 
    row 
}) 

Alcuni punti sono ...

  • tipo di record è GenericRecord
  • readSupportClass è AvroReadSupport
  • passaggio di configurazione per FILESTREAM
  • set parquet.read.support.classe alla Configurazione

Ho fatto riferimento ai codici sorgente di seguito per la creazione di un campione.
E inoltre non sono riuscito a trovare buoni esempi.
Mi piacerebbe aspettare meglio.

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

+0

Sai come risolvere l'errore non found' 'parquet.avro.AvroReadSupport in fase di esecuzione? Come stai gestendo il tuo lavoro? È incluso nei miei deps in build.sbt e tuttavia spark non riesce a trovarlo. – Niemand

+0

Hai impostato il classpath parquet-avro.jar su driver ed esecutori? – tabata

+0

Il mio ambiente è CDH 5.6.x e parquet-avro.jar è già stato incluso in hadoop lib dir. – tabata