Ho un processo sparkming che legge da Kafka ogni 5 secondi, esegue una trasformazione sui dati in arrivo e quindi scrive sul file system.Come interrompere lo streaming spark quando la sorgente dati è esaurita
Questo non ha davvero bisogno di essere un processo di streaming, e in realtà, voglio solo eseguirlo una volta al giorno per drenare i messaggi sul file system. Non sono sicuro di come fermare il lavoro.
Se passo un timeout al streamingContext.awaitTermination, non si ferma il processo, non fa altro che provoca il processo per riprodursi errori quando arriva il momento di scorrere sul flusso (vedi errore sotto)
Qual è il modo migliore per realizzare quello che sto cercando di fare
questo è per Spark 1.6 su Python
EDIT:
grazie alla @marios la soluzione era questa:
ssc.start()
ssc.awaitTermination(10)
ssc.stop()
che esegue lo script per dieci secondi prima dell'arresto.
codice semplificato:
conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
ssc,
kafkaParams["zookeeper.connect"],
"vehicle-data-importer",
topicPartitions,
kafkaParams)
stream.saveAsTextFiles('stream-output/kafka-vehicle-data')
ssc.start()
ssc.awaitTermination(10)
errore:
16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:232)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
at com.sun.proxy.$Proxy14.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)
Come puoi sapere che la sorgente dati è fuori? A proposito, puoi chiamare 'ssc.stop()' dopo 'ssc.awaitTermination' per fermare l'applicazione Streaming. – zsxwing
IMHO, se hai solo bisogno di leggere i dati una volta al giorno, crea un lavoro Spark Batch per leggere ed elaborare i dati e utilizzare ulteriormente un programmatore come cron o Quartz per pianificare il tuo lavoro. – Sumit
Il problema con l'esecuzione di un singolo batch (utilizzando createRDD) è che non esiste un modo semplice per tracciare gli offset all'interno di Zookeeper. Questa è una delle cose che mi piacerebbe ottenere qui – lostinplace