Sto usando il consumer di alto livello Python per Kafka e voglio sapere gli ultimi offset per ogni partizione di un argomento. Tuttavia non riesco a farlo funzionare.Come ottenere l'ultimo offset per una partizione per un argomento di kafka?
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
Ma l'uscita ottengo è
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
ho un approccio alternativo utilizzando assign
ma il risultato è lo stesso
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
Sembra da alcune della documentazione che potrei ottenere questo comportamento se non è stato emesso un fetch
. Ma non riesco a trovare un modo per forzarlo. Che cosa sto facendo di sbagliato?
Oppure esiste un modo diverso/più semplice per ottenere gli ultimi offset per un argomento?
non al 100% positivo, ma credo che il codice sta tornando il valore di highwater prima 'Kafka-python' ha effettivamente collegata al broker . Dato che 'KafkaConsumer' è asincrono, penso che devi effettivamente consumare un messaggio per il valore di highwater da compilare: https://github.com/dpkp/kafka-python/issues/509#issuecomment-178114516 –