Nel nostro lavoro spark-streaming leggiamo messaggi in streaming da kafka.get topic from kafka message in spark
Per questo, utilizziamo l'API KafkaUtils.createDirectStream
che restituisce JavaPairInputDStreamfrom
.
I messaggi vengono letti da Kafka (da tre temi - test1, test2, test3) nel seguente modo:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Vogliamo gestire i messaggi provenienti da ogni argomento in modo diverso, e al fine di per raggiungere questo obiettivo è necessario conoscere il nome dell'argomento per ciascun messaggio.
in modo da effettuare le seguenti operazioni:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
e questo è l'implementazione del SplitToLinesFunction
:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
Il problema è che il tuple2._1
è nulla e si presume che il tuple2._1
conterrà alcune metadati come il nome dell'argomento/partizione da cui il messaggio proviene.
Tuttavia, quando si stampa tuple2._1
, è nullo.
La nostra domanda: c'è un modo per inviare il nome dell'argomento in kafka in modo che nel codice spark-streaming, lo tuple2._1
lo contenga (e non sia nullo)?
Nota che abbiamo anche cercato di ottenere i nomi degli argomenti dalla DSTREAM come indicato nel spark-streaming kafka-integration tutorial:
ma restituisce tutti gli argomenti che sono stati inviati al KafkaUtils.createDirectStream
, e non il tema specifico da dove i messaggi (che appartengono all'attuale RDD) arrivati da.
Quindi non ci ha aiutato a identificare il nome dell'argomento da cui i messaggi nell'RDD sono stati inviati.
EDIT
in risposta alla risposta di David - Ho provato ad utilizzare il MessageAndMetadata
in questo modo:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
Il problema è che nulla è stato stampato nel metodo MessageAndMetadataFunction.call
. cosa devo risolvere per ottenere l'argomento pertinente per tale RDD all'interno del metodo MessageAndMetadataFunction.call
?
Cosa vuol dire "nulla viene stampato qui"? Nemmeno la parte "topic =", o quella parte viene stampata ma i valori sono vuoti. –
Se non, allora si dovrebbe guardare in 'log YARN', o qualunque gruppo che si sta girando. Per me, ci sono file di log in '/ usr/local/hadoop/logs/userLogs /' che catturano 'stdout' dai tuoi esecutori. –
Mi dispiace, conosco il problema ora. È perché il tuo 'MessageAndMetadataFunction' deve restituire sia l'argomento che il messaggio cuciti insieme in un singolo record. In questo momento stai restituendo solo l'argomento, non il messaggio stesso. Questo è il motivo per cui l'argomento viene stampato più e più volte - perché è ciò che stai restituendo da "MessageAndMetadataFunction" - restituiscilo entrambi, avrai entrambi. –