2015-07-08 2 views
5

Desidero scrivere un test per la mia applicazione di streaming spark che consuma una sorgente flume.Utilizzo di un file di testo come sorgente di streaming Spark a scopo di verifica

http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/ suggerisce di utilizzare ManualClock ma per il momento leggere un file e verificare le uscite sarebbe sufficiente per me.

Così desidero utilizzare:

JavaStreamingContext streamingContext = ... 
JavaDStream<String> stream = streamingContext.textFileStream(dataDirectory); 
stream.print(); 
streamingContext.awaitTermination(); 
streamingContext.start(); 

Purtroppo non stampa nulla.

Ho provato:

  • DataDirectory = "HDFS: // nodo: porta/assoluto/percorso/a/HDFS /"
  • DataDirectory = "file: // C: \\ assoluto \\ percorso \\ su \\ Windows \\"
  • aggiungendo il file di testo nella directory prima che il programma inizia
  • aggiungendo il file di testo nella directory mentre l'esecuzione del programma

Niente funziona.

Qualche suggerimento da leggere dal file di testo?

Grazie,

Martin

risposta

0

Io sono così stupido, ho invertito le chiamate per avviare() e awaitTermination()

Se si vuole fare lo stesso, si dovrebbe leggere da HDFS, e aggiungi il file WHILE del programma.

+1

Ciao @Martin, puoi condividere il codice totale, se possibile? Grazie. – user4342532

8

L'ordine di avvio e di attesa è effettivamente invertito.

In aggiunta a ciò, il modo più semplice per passare i dati all'applicazione Spark Streaming per il test è un QueueDStream. È una coda mutevole di RDD di dati arbitrari. Ciò significa che è possibile creare i dati a livello di programmazione o caricarli dal disco in un RDD e trasferirli al codice Spark Streaming.

Es. per evitare i problemi di temporizzazione del fileConsumer, provare:

val rdd = sparkContext.textFile(...) 
val rddQueue: Queue[RDD[String]] = Queue() 
rddQueue += rdd 
val dstream = streamingContext.queueStream(rddQueue) 
doMyStuffWithDstream(dstream) 
streamingContext.start() 
streamingContext.awaitTermination()