2016-04-21 27 views
5

Nel nostro lavoro spark-streaming leggiamo messaggi in streaming da kafka.get topic from kafka message in spark

Per questo, utilizziamo l'API KafkaUtils.createDirectStream che restituisce JavaPairInputDStreamfrom.

I messaggi vengono letti da Kafka (da tre temi - test1, test2, test3) nel seguente modo:

private static final String TOPICS = "test1,test2,test3"; 
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(","))); 

HashMap<String, String> kafkaParams = new HashMap<>(); 
kafkaParams.put("metadata.broker.list", BROKERS); 

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
       streamingContext, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topicsSet 
       ); 

Vogliamo gestire i messaggi provenienti da ogni argomento in modo diverso, e al fine di per raggiungere questo obiettivo è necessario conoscere il nome dell'argomento per ciascun messaggio.

in modo da effettuare le seguenti operazioni:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction()); 

e questo è l'implementazione del SplitToLinesFunction:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> { 
    @Override 
    public String call(Tuple2<String, String> tuple2) 
    { 
     System.out.println(tuple2._1); 
     return tuple2._2(); 
    } 
} 

Il problema è che il tuple2._1 è nulla e si presume che il tuple2._1 conterrà alcune metadati come il nome dell'argomento/partizione da cui il messaggio proviene.

Tuttavia, quando si stampa tuple2._1, è nullo.

La nostra domanda: c'è un modo per inviare il nome dell'argomento in kafka in modo che nel codice spark-streaming, lo tuple2._1 lo contenga (e non sia nullo)?

Nota che abbiamo anche cercato di ottenere i nomi degli argomenti dalla DSTREAM come indicato nel spark-streaming kafka-integration tutorial:

ma restituisce tutti gli argomenti che sono stati inviati al KafkaUtils.createDirectStream, e non il tema specifico da dove i messaggi (che appartengono all'attuale RDD) arrivati ​​da.

Quindi non ci ha aiutato a identificare il nome dell'argomento da cui i messaggi nell'RDD sono stati inviati.

EDIT

in risposta alla risposta di David - Ho provato ad utilizzare il MessageAndMetadata in questo modo:

 Map<TopicAndPartition, Long> topicAndPartition = new HashMap(); 
     topicAndPartition.put(new TopicAndPartition("test1", 0), 1L); 
     topicAndPartition.put(new TopicAndPartition("test2", 0), 1L); 
     topicAndPartition.put(new TopicAndPartition("test3", 0), 1L); 

     class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String> 
     { 

      @Override 
      public String call(MessageAndMetadata<String, String> v1) 
        throws Exception { 
       // nothing is printed here 
       System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition()); 
       return v1.topic(); 
      } 

     } 

     JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction()); 
     messages.foreachRDD(new VoidFunction() { 

      @Override 
      public void call(Object t) throws Exception { 
       JavaRDD<String> rdd = (JavaRDD<String>)t; 
       OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
       // here all the topics kafka listens to are printed, but that doesn't help 
       for (OffsetRange offset : offsets) { 
        System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset()); 
       } 
      } 
     }); 

Il problema è che nulla è stato stampato nel metodo MessageAndMetadataFunction.call. cosa devo risolvere per ottenere l'argomento pertinente per tale RDD all'interno del metodo MessageAndMetadataFunction.call?

+0

Cosa vuol dire "nulla viene stampato qui"? Nemmeno la parte "topic =", o quella parte viene stampata ma i valori sono vuoti. –

+0

Se non, allora si dovrebbe guardare in 'log YARN', o qualunque gruppo che si sta girando. Per me, ci sono file di log in '/ usr/local/hadoop/logs/userLogs /' che catturano 'stdout' dai tuoi esecutori. –

+0

Mi dispiace, conosco il problema ora. È perché il tuo 'MessageAndMetadataFunction' deve restituire sia l'argomento che il messaggio cuciti insieme in un singolo record. In questo momento stai restituendo solo l'argomento, non il messaggio stesso. Questo è il motivo per cui l'argomento viene stampato più e più volte - perché è ciò che stai restituendo da "MessageAndMetadataFunction" - restituiscilo entrambi, avrai entrambi. –

risposta

6

Utilizzare una delle versioni di createDirectStream che accetta come parametro una funzione messageHandler. Ecco quello che faccio:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
    ssc, 
    kafkaParams, 
    getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap, 
    (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)} 
) 

C'è roba lì che non significa niente per te - la parte rilevante è

(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)} 

Se non si ha familiarità con Scala, tutta la funzione fa è di ritorno a Tuple2 contenente msg.topic e msg.message. La tua funzione deve restituire entrambi questi elementi per poterli utilizzare a valle.Potresti semplicemente restituire l'intero oggetto MessageAndMetadata, che ti offre un paio di altri campi interessanti. Ma se si desidera solo il topic e il message, quindi utilizzare quanto sopra.

+0

hey sembra che tu abbia un rinforzo extra puoi correggerlo per favore. –

+1

@David puoi per favore fornire un esempio funzionante o dettagliato in Scala. Come sono confuso con questi parametri daOffsets, messageHandler. Ringraziandovi! –

1

Nella parte inferiore della Kafka integration guide, c'è un esempio che estrae l'argomento dai messaggi.

Il relativo codice in Java:

// Hold a reference to the current offset ranges, so it can be used downstream 
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); 

directKafkaStream.transformToPair(
    new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { 
    @Override 
    public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { 
     OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
     offsetRanges.set(offsets); 
     return rdd; 
    } 
    } 
).map(
    ... 
).foreachRDD(
    new Function<JavaPairRDD<String, String>, Void>() { 
    @Override 
    public Void call(JavaPairRDD<String, String> rdd) throws IOException { 
     for (OffsetRange o : offsetRanges.get()) { 
     System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() 
     ); 
     } 
     ... 
     return null; 
    } 
    } 
); 

Ciò può essere probabilmente crollato in qualcosa di più compatto che chiede solo per il tema e nient'altro.

+0

L'ho provato, stampa tutti gli argomenti che Kafka ascolta, anziché solo l'argomento relativo al RDD corrente. per esempio. - Se ascolto 3 argomenti - test1, test2, test3 - e i messaggi arrivano solo dal test1, questo codice stamperà test1, test2, test3 per ciascun RDD. quindi questo codice non mi aiuta –

+0

Questo non funziona – User3