2015-12-07 6 views
5

Sto riscontrando alcuni problemi durante il tentativo di leggere da Kafka con lo streaming di scintilla.Streaming di Spark Streaming Kafka

Il mio codice è:

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 

val kafkaParams = Map[String, String](
    "zookeeper.connect" -> "localhost:2181", 
    "group.id" -> "consumergroup", 
    "metadata.broker.list" -> "localhost:9092", 
    "zookeeper.connection.timeout.ms" -> "10000" 
    //"kafka.auto.offset.reset" -> "smallest" 
) 

val topics = Set("test") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 

ho già iniziato a Zookeeper al porto 2181 e server di Kafka 0.9.0.0 sulla porta 9092. ma ottengo il seguente errore nel driver Spark:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) 
at scala.Option.map(Option.scala:145) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) 

registro Zookeeper:

[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) 

Qualsiasi suggerimento?

La ringrazio molto

risposta

14

Il problema era legato alla versione errata scintilla-streaming-kafka.

Come descritto nel documentation

Kafka: Spark Streaming 1.5.2 è compatibile con Kafka 0.8.2.1

Così, tra cui

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 

nel mio pom.xml (invece della versione 0.9.0.0) risolto il problema.

Spero che questo aiuti

0

Kafka10 streaming/Spark 2.1.0/DCO/Mesosfera

Ugg ho passato tutto il giorno su questo ed aver letto questo post una dozzina di volte. Ho provato la scintilla 2.0.0, 2.0.1, Kafka 8, Kafka 10. Stai lontano da Kafka 8 e scintilla 2.0.x, e le dipendenze sono tutto. Inizia con sotto. Funziona.

SBT:

"org.apache.hadoop" % "hadoop-aws" % "2.7.3" excludeAll ExclusionRule(organization = "org.apache.hadoop", name = "hadoop-common"), 
"org.apache.spark" %% "spark-core" % "2.1.0", 
"org.apache.spark" %% "spark-sql" % "2.1.0" , 
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0", 
"org.apache.spark" % "spark-streaming_2.11" % "2.1.0" 

lavoro Kafka/Spark codice Streaming:

val spark = SparkSession 
    .builder() 
    .appName("ingest") 
    .master("local[4]") 
    .getOrCreate() 

import spark.implicits._ 
val ssc = new StreamingContext(spark.sparkContext, Seconds(2)) 

val topics = Set("water2").toSet 

val kafkaParams = Map[String, String](
    "metadata.broker.list"  -> "broker:port,broker:port", 
    "bootstrap.servers"   -> "broker:port,broker:port", 
    "group.id"     -> "somegroup", 
    "auto.commit.interval.ms"  -> "1000", 
    "key.deserializer"   -> "org.apache.kafka.common.serialization.StringDeserializer", 
    "value.deserializer"   -> "org.apache.kafka.common.serialization.StringDeserializer", 
    "auto.offset.reset"   -> "earliest", 
    "enable.auto.commit"   -> "true" 
) 

val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) 

messages.foreachRDD(rdd => { 
    if (rdd.count() >= 1) { 
    rdd.map(record => (record.key, record.value)) 
     .toDS() 
     .withColumnRenamed("_2", "value") 
     .drop("_1") 
     .show(5, false) 
    println(rdd.getClass) 
    } 
}) 
ssc.start() 
ssc.awaitTermination() 

prega come se si vede questo in modo da poter ottenere alcuni punti reputazione. :)