2013-08-27 17 views
7

Sto lavorando attraverso il quickstart Kafka:Kafka Quickstart: quali dipendenze ho bisogno?

http://kafka.apache.org/07/quickstart.html

e la base esempio Consumer Group:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

ho codificato il consumatore e ConsumerThreadPool come sopra:

import kafka.consumer.KafkaStream; 
import kafka.consumer.ConsumerIterator; 

public class Consumer implements Runnable { 

    private KafkaStream m_stream; 
    private Integer m_threadNumber; 

    public Consumer(KafkaStream a_stream, Integer a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()) { 
      System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); 

     } 
     System.out.println("Shutting down Thread: " + m_threadNumber); 
    } 
} 

Un paio di altre sfaccettature: I am usi primavera ng per gestire il mio Zookeeper:

import javax.inject.Named; 
import java.util.Properties; 
import kafka.consumer.ConsumerConfig; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.ComponentScan; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
@ComponentScan("com.truecar.inventory.worker.core") 
public class AppConfig { 

    @Bean 
    @Named("consumerConfig") 
    private static ConsumerConfig createConsumerConfig() { 
     String zookeeperAddress = "127.0.0.1:2181"; 
     String groupId = "inventory"; 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeperAddress); 
     props.put("group.id", groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 
     return new ConsumerConfig(props); 
    } 
} 

E io sono la compilazione con Maven e il plugin OneJar Maven. Tuttavia, compilo e quindi eseguire il conseguente un vasetto ottengo il seguente errore:

Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters 
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning 
Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:792) 
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803) 
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
at java.lang.Class.getDeclaredMethods0(Native Method) 
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521) 
at java.lang.Class.getDeclaredMethods(Class.java:1845) 
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180) 
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222) 
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165) 
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140) 
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282) 
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223) 
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630) 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461) 
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73) 
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31) 
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20) 
... 6 more 
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject 
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
... 27 more 

Ora, so poco di Kafka, e nulla di Scala. Come posso risolvere questo? Cosa dovrei provare dopo? È un problema noto? Ho bisogno di altre dipendenze? Ecco la versione Kafka nel mio pom.xml:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.9.2</artifactId> 
    <version>0.8.0-beta1</version> 
</dependency> 

Aggiornamento: Ho contattato la mailing list Kafka dev, e mi hanno fatto sapere alcuni requisiti versione specifica per le dipendenze Scala. Tuttavia, esiste anche una dipendenza log4j non documentata, che si traduce in un altro runtime, non in un tempo di compilazione, un'eccezione.

Exception in thread "main" java.lang.reflect.InvocationTargetException 
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V 
at org.apache.log4j.Category.log(Category.java:333) 
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177) 

Un altro aggiornamento:

ho trovato la corretta dipendenza log4j:

<dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 

ma ora sono incontrati con un'eccezione ancora più criptico runtime:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

A questo punto ho avuto il tipo di sentimento WTF. Così ho aggiunto un'altra dipendenza:

<dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 

Ma questa ennesima eccezione di runtime esposto:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge 
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146) 
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

Spero di essere in grado di ottenere questo esempio il bambino attivo e funzionante, ma forse questo è il prezzo da pagare per l'utilizzo di prodotti beta? Forse dovrei passare ad Apache Active MQ. Ma sembra meno divertente. Mi sto perdendo qualcosa?

risposta

9

Il problema è che kafka beta was built in a way that pom generated with a jar isn't valid and maven could not recognize it and parse properly recupera così le dipendenze transitive. Siamo riusciti a mitigare questo problema inserendo tutte le dipendenze da quel pom (scala, zk, ecc.) Nella nostra definizione pom. Stiamo aspettando le prossime versioni beta di kafka, in cui verrà risolto il problema.

L'elenco completo delle dipendenze è riportato di seguito. Nota che devi modificare la dipendenza della versione scala di conseguenza con il suffisso del tuo artefatto kafka.

<dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.8.0</version> 
     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.15</version> 
      <exclusions> 
       <exclusion> 
        <groupId>com.sun.jmx</groupId> 
        <artifactId>jmxri</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>com.sun.jdmk</groupId> 
        <artifactId>jmxtools</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>javax.jms</groupId> 
        <artifactId>jms</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 
     <dependency> 
      <groupId>net.sf.jopt-simple</groupId> 
      <artifactId>jopt-simple</artifactId> 
      <version>3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-simple</artifactId> 
      <version>1.6.4</version> 
     </dependency> 
     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-compiler</artifactId> 
      <version>2.8.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.101tec</groupId> 
      <artifactId>zkclient</artifactId> 
      <version>0.3</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-core</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-annotation</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.easymock</groupId> 
      <artifactId>easymock</artifactId> 
      <version>3.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.scalatest</groupId> 
      <artifactId>scalatest</artifactId> 
      <version>1.2</version> 
      <scope>test</scope> 
     </dependency> 

Per quanto riguarda il

Maybe I should switch to Apache Active MQ. But that sounds less fun. Am I missing something?

Beh, non dimenticate che questa è la versione beta rilascio? Alcune cose brutte stanno accadendo, infatti, ma al momento stiamo eseguendo kafka 0.7 senza qualsiasi sforzo.

+0

Impressionante, grazie per la risposta. Mi piacerebbe provare 0.7, ma su Maven sono disponibili solo 0,8 barattoli. Cosa consiglieresti in termini di accesso programmatico? –

+0

@DavidWilliams usiamo [kafka costruito da twitter] (http://search.maven.org/#artifactdetails%7Ccom.twitter%7Ckafka_2.9.2%7C0.7.0%7Cjar) per 0.7. Cosa intendi per * accesso programmatico *? –

+0

Ah, dovrebbe essere più specifico, in Java. Gli unici artefatti Maven per Kafka sono 0,8 –

3

ho trovato questa configurazione delle dipendenze per essere funzionale:

<dependencies> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-core</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-context</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.9.2</artifactId> 
     <version>0.8.0-beta1</version> 
    </dependency> 
    <dependency> 
     <groupId>javax.inject</groupId> 
     <artifactId>javax.inject</artifactId> 
     <version>1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>2.9.2</version> 
    </dependency> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>com.yammer.metrics</groupId> 
     <artifactId>metrics-core</artifactId> 
     <version>2.2.0</version> 
    </dependency> 
</dependencies>