2016-01-29 19 views
10

Ho un processo sparkming che legge da Kafka ogni 5 secondi, esegue una trasformazione sui dati in arrivo e quindi scrive sul file system.Come interrompere lo streaming spark quando la sorgente dati è esaurita

Questo non ha davvero bisogno di essere un processo di streaming, e in realtà, voglio solo eseguirlo una volta al giorno per drenare i messaggi sul file system. Non sono sicuro di come fermare il lavoro.

Se passo un timeout al streamingContext.awaitTermination, non si ferma il processo, non fa altro che provoca il processo per riprodursi errori quando arriva il momento di scorrere sul flusso (vedi errore sotto)

Qual è il modo migliore per realizzare quello che sto cercando di fare

questo è per Spark 1.6 su Python

EDIT:

grazie alla @marios la soluzione era questa:

ssc.start() 
ssc.awaitTermination(10) 
ssc.stop() 

che esegue lo script per dieci secondi prima dell'arresto.

codice semplificato:

conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true') 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc, 5) 
stream = KafkaUtils.createStream(
    ssc, 
    kafkaParams["zookeeper.connect"], 
    "vehicle-data-importer", 
    topicPartitions, 
    kafkaParams) 

stream.saveAsTextFiles('stream-output/kafka-vehicle-data') 

ssc.start() 
ssc.awaitTermination(10) 

errore:

16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB) 
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers 
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200 
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms 
py4j.Py4JException: Cannot obtain a new communication channel 
    at py4j.CallbackClient.sendCommand(CallbackClient.java:232) 
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111) 
    at com.sun.proxy.$Proxy14.call(Unknown Source) 
    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) 
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) 
    at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB) 
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB) 
+0

Come puoi sapere che la sorgente dati è fuori? A proposito, puoi chiamare 'ssc.stop()' dopo 'ssc.awaitTermination' per fermare l'applicazione Streaming. – zsxwing

+0

IMHO, se hai solo bisogno di leggere i dati una volta al giorno, crea un lavoro Spark Batch per leggere ed elaborare i dati e utilizzare ulteriormente un programmatore come cron o Quartz per pianificare il tuo lavoro. – Sumit

+0

Il problema con l'esecuzione di un singolo batch (utilizzando createRDD) è che non esiste un modo semplice per tracciare gli offset all'interno di Zookeeper. Questa è una delle cose che mi piacerebbe ottenere qui – lostinplace

risposta

5

Sembra che il metodo giusto da chiamare è awaitTerminationOrTimeout (auto, timeout).

Non sono sicuro se interrompe anche il contesto di streaming. Quindi forse puoi chiamare ssc.stop() subito dopo il timeout.

ssc.start() 
ssc.awaitTerminationOrTimeout(10) 
ssc.stop() 

Nota: Date un'occhiata here per una domanda simile.

+0

Se trovi un dupe flag. –

+0

Questo non è esattamente duplicato (ma può essere rilevante per qualcuno). Il problema è come fermare lo streaming socket per non abbassare l'intero contesto della scintilla.Inoltre, il problema si è verificato in un contesto di Twitter in esecuzione in Scala e non in Kafka in esecuzione su pyspark. – marios

+1

Quindi forse un commento è più adatto se questa non è la risposta –

1

avere una prova Kafka "consumer.timeout.ms" parametro, che si concluderà con grazia KafkaReceiver. (Da kafka 0.8 configuration)

Throw a timeout exception to the consumer if no message is available for consumption after the specified interval

HDF = KafkaUtils.createStream(ssc, topics={strLoc : 1}, kafkaParams={"consumer.timeout.ms":"20000" }, zkQuorum='xxx:2181', groupId='xxx-consumer-group') 

Non sarà in grado di ricevere nuovi messaggi Kafka in streaming corrente esecuzione e ottengono sempre RDD vuoti.
E controllare il conteggio degli RDD vuoti in DSteam.foreachRDD (func). Terminare l'esecuzione di streaming se si ottengono continuamente RDD vuoti.

+0

provato questo: 'kafkaParams = { "metadata.broker.list": str.join (' ' intermediari), "zookeeper.connect": str.join (',', zkNodes), " consumer.id ":" vehicle-data-importatore ", " group.id ":" importatori ", " auto.offset.reset ":" il più piccolo ", " consumer.timeout.ms ":" 10000 " } – lostinplace

+0

"consumer.timeout.ms" non interrompe l'esecuzione dello streaming ma termina il ricevitore kafka in attesa di altri messaggi anche se il nuovo messaggio è arrivato più tardi. puoi semplicemente usare "ssc.awaitTerminationOrTimeout (10)" ma non è sicuro. –