2016-04-18 51 views
7

Uso lo schema avro Apache con Kafka 0.0.8V. I Usa lo stesso schema ai fini del produttore/consumatore. C'è NESSUNA modifica nello schema. Ma ottengo qualche eccezione al consumatore, quando cerco di consumare i messaggi. Perché ottengo questo errore?La decodifica Avro fornisce java.io.EOFException

Produttore

public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException { 
    BinaryEncoder encoder = null; 
    ByteArrayOutputStream out = null; 
    try { 
     DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema); 
     out = new ByteArrayOutputStream(); 
     encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(payload, encoder); 
     encoder.flush(); 

     byte[] serializedBytes = out.toByteArray(); 

     KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes); 

      producer.send(message); 
     } 

consumatori

public void run() { 
     try { 
      ConsumerIterator<byte[], byte[]> itr = stream.iterator(); 
      while (itr.hasNext()) { 

       byte[] data = itr.next().message(); 

       Schema schema = new Schema.Parser() 
         .parse(new File("/Users/xx/avro_schemas/file.avsc")); 

       DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
       Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); 

       GenericRecord payload = reader.read(null, decoder); 
       System.out.println("Message received --: " + payload); 

ma ottengo seguente eccezione quando il lettore provare a leggere il messaggio dal decoder .;

java.io.EOFException 
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473) 
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128) 
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259) 
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363) 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157) 
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) 
    at com.xx.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:55) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

proprietà di consumo

enable.auto.commit=true 
auto.commit.interval.ms=101 
session.timeout.ms=7000 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
zookeeper.connect=zookeeper.xx.com\:2181 
heartbeat.interval.ms=1000 
auto.offset.reset=smallest 
serializer.class=kafka.serializer.DefaultEncoder 
bootstrap.servers=kafka.xx.com\:9092 
group.id=test 
consumer.timeout.ms=-1 
fetch.min.bytes=1 
receive.buffer.bytes=262144 

risposta

1

Il problema è prodotto dal produttore AVRO.

Nel metodo sendFile(), non si sta svuotando il codificatore e non si chiude ByteArrayOutputStream(), causando EOFException.

Ecco un esempio di una classe di serializzazione generico:

public class TestSerializer<T> { 



    final private Class<T> avroType; 

    public TestSerializer(Class<T> avroType) { 
     this.avroType = avroType; 
    } 

    public byte[] serialize(T object) 
    { 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     DatumWriter<T> writer = new SpecificDatumWriter<T>(avroType); 
     try 
     { 
      writer.write(object, encoder); 
      out.close(); 
     } catch (IOException e) 
     { 
      throw new RuntimeException(e); 
     } finally 
     { 
      //Here is the flushing and closing 
      try 
      { 
       if (encoder != null) 
       { 
        encoder.flush(); 
       } 
       if (out != null) 
       { 
        out.close(); 
       } 
      } catch (IOException e) 
      { 
       throw new RuntimeException(e); 
      } 
     } 

     return out.toByteArray(); 

    } 

}