2015-07-31 2 views
8

Ho una semplice classe per consumare messaggi da un server kafka. La maggior parte dei codici viene copiata dai commenti di org.apache.kafka.clients.consumer.KafkaConsumer.java.Esempio di consumer Kafka semplice non funzionante

public class Demo { 

    public static void main(String[] args) { 
     Properties props = new Properties(); 
     props.put("metadata.broker.list", "192.168.144.10:29092"); 
     props.put("group.id", "test"); 
     props.put("session.timeout.ms", "1000"); 
     props.put("enable.auto.commit", "true"); 
     props.put("auto.commit.interval.ms", "10000"); 
     KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props); 
     consumer.subscribe("voltdbexportAUDIT", "voltdbexportTEST"); 
     boolean isRunning = true; 
     while (isRunning) { 
      Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100); 
      process(records); 
     } 
     consumer.close(); 
    } 

    private static Map<TopicPartition, Long> process(Map<String, ConsumerRecords<byte[], byte[]>> records) { 
     Map<TopicPartition, Long> processedOffsets = new HashMap<>(); 
     for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) { 
      List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records(); 
      for (int i = 0; i < recordsPerTopic.size(); i++) { 
       ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i); 
       // process record 
       try { 
        processedOffsets.put(record.topicAndPartition(), record.offset()); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
     return processedOffsets; 
    } 
} 

Sto utilizzando 'org.apache.kafka: kafka-client: 0.8.2.0'. getta l'eccezione

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. 
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) 
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48) 
    at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:430) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:413) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:400) 
    at kafka.integration.Demo.main(Demo.java:26) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 

Come devo configurare key.deserializer?

+0

guardare con maggiore attenzione l'esempio che è stato copiato da. È lì dentro – kdgregory

+0

sarebbe più utile se si può indicare il posto. – David

risposta

1

è necessario impostare le proprietà:

props.put("serializer.class","my.own.serializer.StringSupport"); 
props.put("key.serializer.class","my.own.serializer.LongSupport"); 

nel metodo principale in modo che li si passa al costruttore del produttore. Ovviamente, dovresti specificare gli encoder giusti. La classe serializzatore converte il messaggio in una matrice di byte e la classe key.serializer trasforma l'oggetto chiave in una matrice di byte. In genere avresti anche la possibilità di invertire la procedura.

10

Questo funziona out of the box senza implementare i propri serializzatori

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test"); 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("session.timeout.ms", "30000"); 
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("partition.assignment.strategy", "range"); 
2

si tratta di array di byte per il parametro chiave e valore. Sono richiesti serializzatore e deserializzatore Byte.

è possibile aggiungere nelle proprietà,

Per Deserialize

props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer"); 

Per Serialize

props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer");