2015-02-11 3 views
9

Ho una semplice Kafka dei consumatori in Java con il seguente codiceKafka consumatori appesi a .hasNext in java

public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()&& !done){ 
      try { 
       System.out.println("Parsing data"); 
       byte[] data = it.next().message(); 
       System.out.println("Found data: "+data); 
       values.add(data); // array list 
      } catch (InvalidProtocolBufferException e) { 
       e.printStackTrace(); 
      } 
     } 
     done = true; 
    } 

Quando un messaggio viene inviato, i dati vengono letti correttamente, tuttavia quando si torna a controllarlo .hasNext(), rimane sospeso e non ritorna mai.

Cosa potrebbe essere in fase di stallo?

m_stream si ottiene un KafkaStream come segue:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
topicCountMap.put(topic, new Integer(a_numThreads)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
executor = Executors.newFixedThreadPool(a_numThreads); 
for (final KafkaStream stream : streams) { 
    // m_stream is one of these streams 
} 

risposta

11

La soluzione era quella di aggiungere la proprietà

"consumer.timeout.ms"

Ora, quando viene raggiunto il timeout di un ConsumerTimeoutException è generata