2016-02-26 7 views
7

Dopo creare più consumatori (usando Kafka 0.9 Java API) e ciascun thread iniziato, sto ottenendo la seguente eccezioneKafka CommitFailedException eccezione consumatore

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance 
class com.messagehub.consumer.Consumer is shutting down. 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546) 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487) 
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681) 
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654) 
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350) 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157) 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352) 
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936) 
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905) 

e poi iniziare a consumare messaggio normalmente, vorrei sapere che cosa sta causando questa eccezione per risolverlo.

+0

Hugo, hai ancora riscontrato questo problema? puoi fornire qualche informazione in più? – Nautilus

+0

Sì @nautilus, sto ancora avendo questo problema. Ho 3 consumatori, tutti nello stesso gruppo di consumatori, ho un argomento con 20 partizioni, da cui i dati dovrebbero essere letti. Questa eccezione si verifica casualmente, tuttavia i consumatori possono leggere i dati da argomento/partizioni, sebbene questa eccezione sia attivata. –

+0

i consumatori stanno solo consumando i dati o lo stanno elaborando anche loro?Vedo sul tuo stacktrace che l'eccezione si verifica quando stai provando a commitSync dell'offset, puoi descrivere cosa succede tra il consumo dei messaggi e il commit dell'offset? Penso che sia possibile che il tuo consumatore stia perdendo battito cardiaco con il coordinatore. – Nautilus

risposta

-1

Problema di riequilibrio del gruppo di consumatori. Puoi dirci, con quante partizioni è stato creato l'argomento? e quanti consumatori stanno correndo? Appartengono allo stesso gruppo?

+0

Ho più partizioni di quel membro, quindi non dovrebbe un membro ricevere messaggi da più partizioni? Ho un argomento con 20 partizioni e 3 membri in esecuzione, tutti appartengono allo stesso gruppo di consumatori. –

10

Prova anche di modificare i seguenti parametri:

  • heartbeat.interval.ms - Questo dice Kafka attendere la quantità specificata di millisecondi prima di prendere in considerazione il consumatore sarà considerato "morto"
  • max.partition.fetch.bytes - Questo limiterà la quantità di messaggi (fino a) che il consumatore riceverà durante il polling.

Ho notato che il riequilibrio si verifica se il consumatore non si impegna a Kafka prima che l'heartbeat scada. Se il commit si verifica dopo l'elaborazione dei messaggi, la quantità di tempo per elaborarli determinerà questi parametri. Quindi, riducendo il numero di messaggi e aumentando il tempo del battito cardiaco, si eviterà di riequilibrare.

Considerare inoltre di utilizzare più partizioni, quindi ci saranno più thread che elaborano i dati, anche con meno messaggi per sondaggio.

Ho scritto questa piccola applicazione per fare dei test. Spero che sia d'aiuto.

https://github.com/ajkret/kafka-sample

UPDATE

Kafka 0.10.x offre ora un nuovo parametro per controllare il numero di messaggi ricevuti: - max.poll.records - Il numero massimo di record restituito in una sola chiamata al sondaggio().

UPDATE

Kafka offre un modo per pausa coda. Mentre la coda è in pausa, è possibile elaborare i messaggi in una Discussione separata, consentendo di chiamare KafkaConsumer.poll() per inviare heartbeat. Quindi chiamare KafkaConsumer.resume() al termine dell'elaborazione. In questo modo si attenuano i problemi di provocando i riequilibri a causa della mancata trasmissione di heartbeat. Ecco uno schema di ciò che puoi fare:

while(true) { 
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE); 
    consumer.commitSync(); 

    consumer.pause(); 
    for(ConsumerRecord record: records) { 

     Future<Boolean> future = workers.submit(() -> { 
      // Process 
      return true; 
     }); 


     while (true) { 
      try { 
       if (future.get(1, TimeUnit.SECONDS) != null) { 
        break; 
       } 
      } catch (java.util.concurrent.TimeoutException e) { 
       getConsumer().poll(0); 
      } 
     } 
    } 

    consumer.resume(); 
} 
+0

La versione 0.10.x ora ha un nuovo parametro, * max.poll.records *, da usare al posto di max.partition.fetch.bytes. – ajkret

+0

Ho usato lo stesso approccio di Pausa e Riprendi, ma ho ancora lo stesso errore. L'unica differenza è che sto chiamando commitSync() dopo pause() e prima di resume(), poiché ho bisogno di eseguire il commit solo se i record vengono elaborati. Qualche idea su cosa sto facendo male? –

+0

@ mav3n: sto riscontrando lo stesso problema. Ho provato ad aumentare session.timeout.ms e max.poll.records ma senza successo. Hai trovato il metodo per farlo? –