Sto cercando di ottenere e memorizzare l'offset per un messaggio specifico in Kafka utilizzando Spark Direct Stream. Guardare la documentazione di Spark è semplice per ottenere gli offset di gamma per ogni partizione, ma quello di cui ho bisogno è di memorizzare l'offset iniziale per ogni messaggio di un argomento dopo una scansione completa della coda.È possibile ottenere uno specifico offset di messaggio in Kafka + SparkStreaming?
5
A
risposta
6
Sì, è possibile utilizzare la versione MessageAndMetadata di createDirectStream
che consente di accedere a message metadata
.
È possibile trovare un esempio qui che restituisce Dstream di tuple3
.
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
Nel precedente esempio tuple3._1
avranno topic
, tuple3._2
avrà offset
e tuple3._3
avrà message
.
Spero che questo aiuti!
Se ho ragione, sarò in grado di leggere da uno specifico offset. Mi chiedo ancora se esiste un modo semplice per calcolare l'offset iniziale di ogni messaggio all'interno di una partizione. Quello di cui ho bisogno è di memorizzare l'offset per ogni messaggio e quindi utilizzare questo codice per leggere un messaggio specifico. Grazie! –
Sì, avevi ragione, ma con il codice sopra troverai anche l'offset associato a ciascun messaggio in 'messagesDStream'. Voglio dire 'createDirectStream' ti dà' Dstream' di 'Tuple3' e in ogni tupla otterrai' topic-name' e 'message' e il suo' offset' associato. – avr
Ciao, mi dispiace per la risposta in ritardo .. Funziona. Tuttavia suppongo che "fromOffset" sia l'offset iniziale da cui eseguire la scansione della partizione. Molte grazie avr –