Sto tentando di usare il SimpleConsumer in Kafka 9 per consentire agli utenti di riprodurre gli eventi da un offset di tempo - ma i messaggi che ricevo di ritorno da Kafka sono in modo molto strano di codifica:Kafka Java SimpleConsumer strana codifica
7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p=
������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"
Utilizzando KafkaConsumer questi messaggi vengono analizzati correttamente. Ecco il codice che sto usando per recuperare i messaggi utilizzando il SimpleConsumer:
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
log.debug("Found an old offset - skip");
continue;
}
readOffset = messageAndOffset.nextOffset();
int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
byte[] data = messageAndOffset.message().payload().array();
byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
log.debug("Read " + new String(realData, "UTF-8"));
}
ho aggiunto il codice per saltare la prima x byte dopo ho continuato a ottenere UTF-32 errori su byte troppo elevata, che presumo è perché Kafka antepone informazioni come le dimensioni dei messaggi al carico utile. Questo è un artefatto di Avro?
Non sembra Avro - almeno non codifica Avro binaria. Nella codifica binaria non si otterrebbero le informazioni sullo schema nel record. –
Il mio codice è leggermente diverso - Invece di usare 'payload(). Array()', lo faccio come il modo in cui è fatto qui: https://cwiki.apache.org/confluence/display/KAFKA/0.8. 0 + SimpleConsumer + Esempio Eg: 'payload(). Get (bytes)' dove 'bytes' è di tipo' byte [] '. Il metodo 'get()' copia i dati, mentre 'array()' restituisce l'array attuale, e nel Javadocs per 'ByteBuffer' dice:" Le modifiche al contenuto di questo buffer causeranno la modifica del contenuto dell'array restituito, e vice versa." Forse qualcosa del genere è ciò che sta accadendo? –
@Gandalf Per favore apri il tuo messaggio solo nel blocco note ++. Se lo apri con un altro wordpad o un blocco note, sembrerà pericoloso. Quindi aprilo nel blocco note ++ e faccelo sapere. – SkyWalker