2016-03-15 39 views
7

Quando ho tentato di eseguire Kafka Consumer with Avro sui dati con il mio schema rispettivo, restituisce un errore di "AvroRuntimeException: dati non validi. La lunghezza è negativa: -40". Vedo che altri hanno avuto problemi simili coverting byte array to json, Avro write and read e Kafka Avro Binary *coder. Ho anche fatto riferimento questa Consumer Group Example, che sono stati tutti utili, ma nessun aiuto con questo errore finora .. Funziona fino a questa parte del codice (linea 73)Problemi di Kafka Avro Consumer with Decoder

Decoder = DecoderFactory.get(). BinaryDecoder (byteArrayInputStream, null);

Ho provato altri decoder e stampato il contenuto della variabile byteArrayInputStream che mostra come ritengo che i dati avro serializzati debbano essere visualizzati (nel messaggio riesco a vedere lo schema e alcuni dati e alcuni dati non corretti). la stampa dei byte disponibili utilizzando il metodo .available(), che restituisce 594. Ho difficoltà a capire perché questo errore sta accadendo. Apache Nifi è usato per produrre il flusso di Kafka con lo stesso schema di hdf. Gradirei qualsiasi aiuto.

risposta

13

Forse il problema è una discrepanza tra il modo in cui i dati Avro vengono scritti (codificati) da Nifi e in che modo l'app dell'utente sta leggendo (decodificando) i dati.

In poche parole, le API di Avro offre due diversi approcci alla serializzazione:

  1. Per la creazione di adeguati Avro file: Per codificare i set di dati, ma anche per incorporare lo schema Avro in una sorta di preambolo (via org.apache.avro.file.{DataFileWriter/DataFileReader}). L'incorporamento dello schema nei file Avro ha molto senso perché (a) in genere il "carico utile" dei file Avro è di ordini di grandezza maggiore dello schema Avro incorporato e (b) è possibile quindi copiare o spostare tali file a proprio piacimento e stai ancora sicuro di poterli leggere di nuovo senza consultare qualcuno o qualcosa.
  2. Per codificare solo i record di dati, ad esempio per non incorporare lo schema (tramite org.apache.avro.io.{BinaryEncoder/BinaryDecoder}; notare la differenza nel nome del pacchetto: io qui in confronto a file precedente). Questo approccio è spesso favorito quando i messaggi di codifica Avro vengono scritti su un argomento di Kafka, ad esempio, perché rispetto alla variante 1 sopra riportata non si incorre nel reinserimento dello schema di Avro in ogni singolo messaggio, assumendo che il proprio (molto ragionevole) è che, per lo stesso argomento di Kafka, i messaggi sono formattati/codificati con lo stesso schema Avro. Questo è un vantaggio significativo perché, in un contesto di dati di flusso, un record di dati in movimento è in genere molto più piccolo (comunemente tra 100 byte e poche centinaia di KB) rispetto ai file Avro dati-a riposo come descritto sopra (spesso centinaia o migliaia di MB); quindi la dimensione dello schema Avro è relativamente grande, e quindi non si vuole incorporarlo 2000x quando si scrivono 2000 record di dati su Kafka. Lo svantaggio è che devi "in qualche modo" tenere traccia di come gli schemi di Avro mappano agli argomenti di Kafka - o più precisamente, devi in ​​qualche modo tracciare con quale schema di Avro un messaggio è stato codificato senza percorrere il percorso di incorporamento dello schema direttamente. La buona notizia è che ci sono tooling available in the Kafka ecosystem (Avro schema registry) per farlo in modo trasparente. Quindi rispetto alla variante 1, la variante 2 guadagna efficienza a scapito della convenienza.

L'effetto è che il "formato filo" per dati Avro codificati apparirà diverso a seconda che si utilizzi (1) o (2) sopra.

Non ho molta familiarità con Apache Nifi, ma una rapida occhiata al codice sorgente (ad esempio ConvertAvroToJSON.java) mi suggerisce che sta usando la variante 1, cioè incorpora lo schema Avro accanto ai record Avro. Il tuo codice cliente, tuttavia, utilizza DecoderFactory.get().binaryDecoder() e quindi la variante 2 (senza schema incorporato).

Forse questo spiega l'errore che si è verificato?

+1

GRAZIE @miguno era esattamente così! Sto oscillando e rotolando usando il Decoder per il DataFileReader con due cambi di riga. DatumReader datumReader = new SpecificDatumReader (schema); DataFileStream dataFileReader = new DataFileStream (inputStream, datumReader); Correzione – SparkleGoat

+0

* Sto oscillando e rotolando ora che sono passato a DataFileReader con due cambi di riga. Hai ragione binaryDecoder non era la scelta giusta per il lavoro. – SparkleGoat

+1

Felice che abbia funzionato! –