2015-09-04 3 views
80

Sono relativamente nuovo a Kafka. Ho fatto un po 'di esperimenti ma alcune cose non sono chiare per quanto riguarda l'offset del consumatore. Da quello che ho capito, quando un consumatore inizia, l'offset da cui inizierà a leggere è determinato dall'impostazione di configurazione auto.offset.reset (correggimi se ho torto).Che cosa determina l'offset del consumatore Kafka?

Ora, ad esempio, nell'argomento ci sono 10 messaggi (scostamenti da 0 a 9) e un consumatore ha consumato 5 di essi prima che scendesse (o prima che uccidessi il consumatore). Quindi dire che riavvio quel processo di consumo. Le mie domande sono:

Se auto.offset.reset è impostato su smallest, inizierà sempre a consumare dall'offset 0?

Se è impostato su largest, inizierà a consumare dall'offset 5?

Il comportamento di questo tipo di scenario è sempre deterministico? Per favore non esitate a commentare se qualcosa nella mia domanda non è chiara. Grazie in anticipo.

risposta

139

È un po 'più complesso di quello che hai descritto. La configurazione auto.offset.reset si attiva SOLO se il gruppo di consumatori non ha un offset valido eseguito da qualche parte (2 archivi di offset supportati ora sono Kafka e Zookeeper). E dipende anche dal tipo di consumatore che usi.

Se si utilizza un java consumatori di alto livello quindi immaginare seguenti scenari:

  1. Hai un consumatore in un gruppo di consumatori group1 che ha consumato 5 messaggi ed è morto. La prossima volta che avvierai questo utente non userà nemmeno quella configurazione auto.offset.reset e continuerà dal posto in cui è morto perché recupererà solo l'offset memorizzato dall'archivio offset (Kafka o ZK come ho detto).

  2. Si hanno messaggi in un argomento (come descritto in precedenza) e si avvia un utente in un nuovo gruppo di consumatori group2. Non v'è alcuna compensazione memorizzati da nessuna parte e questa volta la configurazione auto.offset.reset deciderà se avviare dall'inizio del tema (smallest) o dalla fine del tema (largest)

Un'altra cosa che colpisce quello che compensato il valore corrisponderà a smallest e largest configs è la politica di conservazione dei registri. Immagina di avere un argomento con memorizzazione configurata su 1 ora. Produci 5 messaggi e poi un'ora dopo pubblichi altri 5 messaggi. Lo scostamento largest rimarrà lo stesso dell'esempio precedente ma lo smallest non sarà in grado di essere 0 perché Kafka rimuoverà questi messaggi e quindi il più piccolo offset disponibile sarà 5.

Tutto quanto menzionato in precedenza non è correlato a SimpleConsumer e ogni volta che viene eseguito, deciderà da dove iniziare utilizzando la configurazione auto.offset.reset.

+2

Grazie molto per la risposta. Quindi, per quanto riguarda il consumatore di alto livello, una volta che un consumatore ha commesso qualcosa (sia in ZK o Kafka), il "auto.offset.reset' non ha alcun significato da quel momento in poi? L'unico significato di questa impostazione è quando non viene commesso nulla (e idealmente sarebbe al primo avvio del consumatore)? –

+1

Esattamente come hai descritto – serejja

+0

L'offset ha un ruolo con i registri di 'Compattazione'. – peaceUser

35

Solo un aggiornamento: da Kafka 0.9 e versioni successive, Kafka utilizza una nuova versione Java dell'utente e i nomi dei parametri auto.offset.reset sono stati modificati; Dal manuale:

Cosa fare quando non c'è offset Kafka o se la corrente di offset non esiste più sul server iniziale (per esempioquando i dati è stato eliminato):

prima: ripristina automaticamente l'offset per la prima compensazione

ultima: ripristina automaticamente l'offset per l'ultima Offset

nessuno: tiro eccezione per il consumatore se non viene trovato nessun offset precedente per il gruppo del consumatore

lse: lanciare un'eccezione al consumatore.

Ho trascorso un po 'di tempo a trovarlo dopo aver controllato la risposta accettata, quindi ho pensato che potesse essere utile per la comunità pubblicarla.

2

Ulteriori ulteriori dettagli sono offsets.retention.minutes. Se il tempo dall'ultima commit è>offsets.retention.minutes, oltre auto.offset.reset calci anche in

+0

non sembra ridondante con la conservazione del registro? la conservazione dovrebbe essere basata sulla conservazione dei log? – mike01010