2016-02-16 15 views
10

Sto usando il consumer di alto livello Python per Kafka e voglio sapere gli ultimi offset per ogni partizione di un argomento. Tuttavia non riesco a farlo funzionare.Come ottenere l'ultimo offset per una partizione per un argomento di kafka?

from kafka import TopicPartition 
from kafka.consumer import KafkaConsumer 

con = KafkaConsumer(bootstrap_servers = brokers) 
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] 

con.assign(ps) 
for p in ps: 
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) 

print "Subscription = %s"%con.subscription() 
print "con.seek_to_beginning() = %s"%con.seek_to_beginning() 

Ma l'uscita ottengo è

For partition 0 highwater is None 
For partition 1 highwater is None 
For partition 2 highwater is None 
For partition 3 highwater is None 
For partition 4 highwater is None 
For partition 5 highwater is None 
.... 
For partition 96 highwater is None 
For partition 97 highwater is None 
For partition 98 highwater is None 
For partition 99 highwater is None 
Subscription = None 
con.seek_to_beginning() = None 
con.seek_to_end() = None 

ho un approccio alternativo utilizzando assign ma il risultato è lo stesso

con = KafkaConsumer(bootstrap_servers = brokers) 
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] 

con.assign(ps) 
for p in ps: 
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) 

print "Subscription = %s"%con.subscription() 
print "con.seek_to_beginning() = %s"%con.seek_to_beginning() 
print "con.seek_to_end() = %s"%con.seek_to_end() 

Sembra da alcune della documentazione che potrei ottenere questo comportamento se non è stato emesso un fetch. Ma non riesco a trovare un modo per forzarlo. Che cosa sto facendo di sbagliato?

Oppure esiste un modo diverso/più semplice per ottenere gli ultimi offset per un argomento?

+0

non al 100% positivo, ma credo che il codice sta tornando il valore di highwater prima 'Kafka-python' ha effettivamente collegata al broker . Dato che 'KafkaConsumer' è asincrono, penso che devi effettivamente consumare un messaggio per il valore di highwater da compilare: https://github.com/dpkp/kafka-python/issues/509#issuecomment-178114516 –

risposta

23

Infine, dopo aver trascorso una giornata in questo e in diverse false partenze, sono riuscito a trovare una soluzione e farlo funzionare. Pubblicandolo in modo che altri possano riferirsi ad esso.

from kafka import SimpleClient 
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy 
from kafka.common import OffsetRequestPayload 

client = SimpleClient(brokers) 

partitions = client.topic_partitions[topic] 
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] 

offsets_responses = client.send_offset_request(offset_requests) 

for r in offsets_responses: 
    print "partition = %s, offset = %s"%(r.partition, r.offsets[0]) 
+1

C'è un modo? per ottenere l'offset corrente/successivo per consumatore/gruppo per partizione? – GreenThumb

+0

Purtroppo, SimpleClient è stato dichiarato obsoleto e gli offset sopra riportati restituiscono un errore FailedPayloadsError: FailedPayloadsError – dreynold

9

Se si desidera utilizzare gli script di shell Kafka presenti in Kafka/bin, quindi è possibile ottenere gli ultimi e più piccoli offset utilizzando kafka-run-class.sh.

per le ultime comando di offset sarà simile a questa

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname 

Per ottenere più piccolo comando di offset sarà simile a questa

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname 

Potete trovare ulteriori informazioni su Get Offset Shell dal seguente link

Spero che questo aiuti!

4
from kafka import KafkaConsumer, TopicPartition 

TOPIC = 'MYTOPIC' 
GROUP = 'MYGROUP' 
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092'] 

consumer = KafkaConsumer(
     bootstrap_servers=BOOTSTRAP_SERVERS, 
     group_id=GROUP, 
     enable_auto_commit=False 
    ) 


for p in consumer.partitions_for_topic(TOPIC): 
    tp = TopicPartition(TOPIC, p) 
    consumer.assign([tp]) 
    committed = consumer.committed(tp) 
    consumer.seek_to_end(tp) 
    last_offset = consumer.position(tp) 
    print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed))) 

consumer.close(autocommit=False) 
0

Un altro modo per raggiungere questo obiettivo è polling il consumatore per ottenere l'ultimo consumato offset e quindi utilizzando il metodo seek_to_end per ottenere il più recente di partizione di offset disponibili.

from kafka import KafkaConsumer 
consumer = KafkaConsumer('my-topic', 
        group_id='my-group', 
        bootstrap_servers=['localhost:9092']) 
consumer.poll() 
consumer.seek_to_end() 

Questo metodo è particolarmente utile quando si utilizzano gruppi di consumatori.

FONTI:

  1. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.poll
  2. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.seek_to_end
+0

Il mio server ha centinaia di messaggi, tuttavia consumer.poll() restituito {} – Nick