Sto provando a lavorare con l'API di kafka in java. Sto usando la seguente dipendenza di Maven:Come posso produrre messaggi con l'API di Kafka 8.2 in Java?
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
Ho problemi di connessione a un server kafka remoto. Ho cambiato l'attributo di porta file kafka 'server.properties' nella porta 8080. Posso avviare sia il guardiano che il server di kafka senza problemi. Posso anche usare la console di produzione e le applicazioni consumer fornite con il download di kafka. (Scala 2.10 versione)
Sto utilizzando il seguente codice client per creare un KafkaProducer remota
Properties propsProducer = new Properties();
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");
KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);
Una volta ho creato il produttore, posso eseguire la seguente riga e ottenere valide informazioni argomento è tornato, concesso strTopic è un nome di argomento esistente.
List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);
Quando tento di inviare un messaggio, effettuare le seguenti operazioni:
ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());
RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();
La chiamata a send() blocchi a tempo indeterminato e, quando termino il processo manualmente, vedo che la presa Errore durante la chiusura a causa di un errore sull'errore del server kafka (IOException, Connection Reset by Peer).
Inoltre, non vale nulla che le proprietà host.name, advertised.host.name e advertised.port siano ancora commentate nel file 'server.properties'. Oh, e se cambio la linea:
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
a
propsProducer.put("bootstrap.servers", "127.0.0.1:8080");
ed eseguirlo sullo stesso server su cui è installato il server di Kafka, funziona, ma sto cercando di lavorare con essa da remoto.
Apprezzare qualsiasi aiuto e se posso chiarire, fatemi sapere.
Stai letteralmente utilizzando '172.xx.xx.xxx' come indirizzo IP host? –
No, è un IP completo le x sono solo maschere. –
Kk. Forse il problema del firewall? È possibile convalidare la connettività di rete sulla porta 8080 utilizzando netcat? –