2015-03-30 2 views
5

Sto provando a lavorare con l'API di kafka in java. Sto usando la seguente dipendenza di Maven:Come posso produrre messaggi con l'API di Kafka 8.2 in Java?

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.0</version> 
</dependency> 

Ho problemi di connessione a un server kafka remoto. Ho cambiato l'attributo di porta file kafka 'server.properties' nella porta 8080. Posso avviare sia il guardiano che il server di kafka senza problemi. Posso anche usare la console di produzione e le applicazioni consumer fornite con il download di kafka. (Scala 2.10 versione)

Sto utilizzando il seguente codice client per creare un KafkaProducer remota

Properties propsProducer = new Properties(); 

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("topic.metadata.refresh.interval.ms", "0"); 

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer); 

Una volta ho creato il produttore, posso eseguire la seguente riga e ottenere valide informazioni argomento è tornato, concesso strTopic è un nome di argomento esistente.

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic); 

Quando tento di inviare un messaggio, effettuare le seguenti operazioni:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes()); 

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get(); 

La chiamata a send() blocchi a tempo indeterminato e, quando termino il processo manualmente, vedo che la presa Errore durante la chiusura a causa di un errore sull'errore del server kafka (IOException, Connection Reset by Peer).

Inoltre, non vale nulla che le proprietà host.name, advertised.host.name e advertised.port siano ancora commentate nel file 'server.properties'. Oh, e se cambio la linea:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 

a

propsProducer.put("bootstrap.servers", "127.0.0.1:8080"); 

ed eseguirlo sullo stesso server su cui è installato il server di Kafka, funziona, ma sto cercando di lavorare con essa da remoto.

Apprezzare qualsiasi aiuto e se posso chiarire, fatemi sapere.

+1

Stai letteralmente utilizzando '172.xx.xx.xxx' come indirizzo IP host? –

+0

No, è un IP completo le x sono solo maschere. –

+0

Kk. Forse il problema del firewall? È possibile convalidare la connettività di rete sulla porta 8080 utilizzando netcat? –

risposta

3

Dopo molte operazioni di scavo, ho deciso di implementare l'esempio trovato qui: Kafka Producer Example. Ho accorciato il codice e non ho implementato una classe di partizionamento. Ho aggiornato il mio pom con la dipendenza elencata e stavo ancora avendo lo stesso problema. Alla fine ho apportato alcune modifiche alla configurazione e tutto ha funzionato.

Il pezzo finale del puzzle era la definizione del server Kafka in/etc/hosts del server e dei computer client. Ho aggiunto quanto segue a entrambi i file.

172.xx.xx.xxx  serverHost1 

Ancora, le x sono solo maschere. Quindi, ho impostato advertised.host.name nel file server.properties su serverHost1. NOTA: ho ottenuto quell'IP dopo aver eseguito un ifconfig sulla macchina server.

ho cambiato la linea

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080"); 

a

propsProducer.put("metadata.broker.list", "serverHost1:8080"); 

Il Kafka API non mi è piaciuto il fatto che mi è stato la definizione di un IP come una stringa.Invece cercava l'IP dal file etc/hosts anche se la documentazione dice:

"Nome host che il broker pubblicizzerà a produttori e consumatori Se non impostato, utilizza il valore per" host.name "se configurato Altrimenti, utilizzerà il valore restituito da java.net.InetAddress.getCanonicalHostName(). "

Che restituirà solo l'IP, nella stringa, in precedenza utilizzavo se non definito in etc/hosts della macchina client, altrimenti restituisce il nome associato all'IP (serverHost1 nel mio caso). Inoltre, non ho mai impostato il valore di host.name.

+0

bootstrap.servers sostituisce metadata.broker.list? –

+1

Sì, credo di si. Nella versione 0.8.2.0, il campo era "metadata.broker.list", ma nelle versioni più recenti è "boostrap.servers" –

+0

Sì !! È vero. con il nuovo ProducerAPI questa è la nuova configurazione. –