2014-09-08 5 views
5

Recentemente ho notato che Camel ora ha il suo componente per Kafka, quindi ho deciso di dargli un vortice.Integrazione Camel Kafka

ho deciso di provare un bel semplice file -> topic Kafka come segue ...

<route> 
     <from uri="file:///tmp/input" /> 
     <setHeader headerName="kafka.PARTITION_KEY"> 
      <constant>Test</constant> 
     </setHeader> 
     <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1" /> 
</route> 

Questo sembra abbastanza semplice, tuttavia, a correre questo ho ...

java.lang.ClassCastException: java.lang.String cannot be cast to [B 
    at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) 
    at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78) 

e il controllo sul codice Cammello, si comporta nel modo seguente ...

String msg = exchange.getIn().getBody(String.class); 
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); 
producer.send(data); 

Ovviamente, questo è un problema di serializzazione, io sono solo non sono sicuro se c'è una soluzione alternativa o se questo è intrinsecamente un bug con l'implementazione esistente? (O forse solo il mio equivoco)

Qualche suggerimento? Grazie, J

risposta

10

Ah, non importa, eccoci qui ... Spero che questo aiuti qualcun altro, devi impostare il serializzatore nelle opzioni.

<route> 
      <from uri="file:///tmp/input" /> 
      <setHeader headerName="kafka.PARTITION_KEY"> 
       <constant>Test</constant> 
      </setHeader> 
      <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder" /> 
</route> 
0

trovato un bel esempio per l'installazione e l'avvio di Apache Kafka, e la configurazione di un endpoint cammello per l'invio di un messaggio di Kafka topic-

@Override 
    public void configure() throws Exception { 

     String topicName = "topic=javainuse-topic"; 
     String kafkaServer = "kafka:localhost:9092"; 
     String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181"; 
     String serializerClass = "serializerClass=kafka.serializer.StringEncoder"; 

     String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&") 
       .append(zooKeeperHost).append("&").append(serializerClass).toString(); 

     from("file:C:/inbox?noop=true").split().tokenize("\n").to(toKafka); 
    } 

Reference Apache Camel + Kafka Integration example