2015-04-22 3 views

risposta

12

È possibile farlo con l'aiuto della shell di zookeeper. Kafka usa il guardiano dello zoo per tracciare gli offset del consumatore.

Vai alla directory bin Kafka e richiamare shell Zookeeper. (La mia versione Kafka è 0.8.0)

./zookeeper-shell.sh localhost:2181 

Ora usare il guardiano dello zoo comando get

get /consumers/consumer_group_id/offsets/topic/0 

che mostra qualcosa di simile

2043 
cZxid = 0x4d 
ctime = Wed Mar 18 03:56:32 EDT 2015 
... 

Qui 2043 è lo scostamento massimo consumato. Impostare a valore desiderato utilizzando il comando Zookeeper

set /consumers/consumer_group_id/offsets/topic/0 10000 

impostare il percorso è incorniciata come questo/consumatori/[consumer_group_id]/offset/[argomento]/[partition_id].
Dovrai sostituire con il gruppo di consumatori, l'argomento e l'ID della partizione appropriati.

* Inoltre, dal momento che hai menzionato che si tratta di una nuova istanza di kafka, non sono sicuro che i consumatori si siano collegati e che i gruppi di consumatori siano stati creati.

+0

Ottima risposta, questo mi ha davvero aiutato nel mio progetto! –

1

Poiché gli offset di kafka 0.9 sono memorizzati in un argomento. Per cambiare offset, utilizzare il seek() method:

public void seek(TopicPartition partition, long offset) 

Sostituzioni l'offset che il consumatore usare sul prossimo poll(timeout) recupero. Se questa API viene richiamata per la stessa partizione più di una volta, l'ultimo offset verrà utilizzato al successivo poll(). Nota che potresti perdere i dati se questa API viene arbitrariamente utilizzata nel mezzo del consumo, per azzerare le compensazioni fetch