2016-05-28 24 views
6

Sto provando a collegarmi al mio broker su aws con auto.create.topics.enable = true nel mio file server.properties. Ma quando sto cercando di connettermi al broker usando il produttore client Java, sto ottenendo il seguente error.Campo di lettura dell'errore 'topic_metadata' in Kafka

1197 [kafka-producer-network-thread | producer-1] ERRORE org.apache.kafka.clients.producer.internals.Sender - Errore non rilevato in thread I/O produttore kafka: org.apache.kafka.common.protocol.types.SchemaException: errore durante la lettura del campo 'topic_metadata': errore nella lettura dell'array di dimensioni 619631, solo 37 byte disponibili a org.apache.kafka.common.protocol.types.Schema.read (Schema.java:73) a org.apache.kafka.clients. NetworkClient.parseResponse (NetworkClient.java:380) a org.apache.kafka.clients.NetworkClient.handleCompletedReceives (NetworkClient.java:449) a org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java: 269) allo org.apache.kafka.clients.producer.internals.Sender.run (Sender.java:229) a org.apache.kafka.clients.producer.internals.Sender.run (Sender.java:134) presso java.lang.Thread.run (sorgente sconosciuta)

Di seguito è riportato il codice produttore del cliente.

public static void main(String[] argv){ 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092"); 
     props.put("acks", "all"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 0); 
     props.put("buffer.memory", 33554432); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("block.on.buffer.full",true); 
     Producer<String, String> producer = new KafkaProducer<String, String>(props); 
     try{ for(int i = 0; i < 10; i++) 
     { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i))); 
      System.out.println("Tried sending:"+i);} 
     } 
     catch (Exception e){ 
      e.printStackTrace(); 
     } 
     producer.close(); 
} 

Qualcuno può aiutarmi a risolvere questo?

risposta

2

Ho affrontato il problema simile. Il problema qui è che quando c'è una discrepanza tra la versione dei client di kafka nel file pom e il server kafka è diverso. Stavo usando i client kafka 0.10.0.0_1 ma il server kafka era ancora in 0.9.0.0. Così ho aggiornato la versione del server kafka a 10 il problema è stato risolto.

<dependency> 
      <groupId>org.apache.servicemix.bundles</groupId> 
      <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId> 
      <version>0.10.0.0_1</version> 
     </dependency>    
1

Sembra che stavo impostando proprietà errate sul lato client anche il mio file server.properties aveva proprietà che non erano pensate per il client che stavo usando. Così ho deciso di cambiare il client java alla versione 0.9.0 usando maven.

<dependency> 
<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.11</artifactId> 
<version>0.9.0.0</version> 
</dependency> 

il mio file server.properties è il seguente.

broker.id=0 
port=9092 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
log.dirs=/tmp/kafka-logs 
num.partitions=1 
num.recovery.threads.per.data.dir=1 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
log.cleaner.enable=false 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=9000 
delete.topic.enable=true 
advertised.host.name=<aws public Ip> 
advertised.port=9092 

Il mio codice produttore sembra

import java.util.Properties; 
    import java.util.concurrent.ExecutionException; 

    import org.apache.kafka.clients.producer.KafkaProducer; 
    import org.apache.kafka.clients.producer.ProducerConfig; 
    import org.apache.kafka.clients.producer.ProducerRecord; 
    import org.apache.kafka.common.serialization.StringSerializer; 
    public class HelloKafkaProducer 
    { 


     public static void main(String args[]) throws InterruptedException,  ExecutionException { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 

     KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props); 

     boolean sync = false; 
     String topic="loader1"; 
     String key = "mykey"; 
     for(int i=0;i<1000;i++) 
     { 
     String value = "myvaluehasbeensent"+i+i; 
     ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value); 
     if (sync) { 
      producer.send(producerRecord).get(); 
     } else { 
      producer.send(producerRecord); 
     } 
     } 
     producer.close(); 
    } 
} 
-1

Ho risolto questo problema modificando

/etc/hosts file 

Controllare il file hosts che se guardiano dello zoo o altro IP del broker non sono in questo file.

1

Assicurarsi di utilizzare le versioni corrette . Diciamo si utilizza seguente Maven dependecy:

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.8_2.10</artifactId> 
    <version>${flink.version}</version> 
</dependency> 

Così il manufatto è uguale: Flink-connector-Kafka-0.8_2.10

Ora verificare se si utilizza la versione corretta Kafka:

cd /KAFKA_HOME/libs 

ora trovare kafka_YOUR-VERSION-sources.jar.

Nel mio caso ho kafka_2.10-0.8.2.1-sources.jar. Quindi funziona bene! :) Se si utilizzano versioni diverse, è sufficiente modificare maven dependecies OPPURE scaricare la versione corretta di kafka.