2015-07-10 3 views
15


Sto cercando di utilizzare l'approccio di Spark Kafka Direct Stream. Semplifica il parallelismo creando il maggior numero di partizioni RDD come partizione topic di kafka, come indicato in questo doc. Sulla base delle mie conoscenze, spark creerà un esecutore per ogni partizione RDD per eseguire il calcolo.Spark Kafka Direct DStream - Quanti esecutori e partizioni RDD nella modalità cluster-cluster se sono impostati i num-executor?

Così, quando invio l'applicazione in modalità cluster e specifichi l'opzione num-executors in un valore diverso rispetto al numero di partizioni, quanti esecutori ci saranno?

Ad esempio, v'è un argomento Kafka con 2 partizioni, e io specificare num-esecutori a 4:

export YARN_CONF_DIR=$HADOOP_HOME/client_conf 

./bin/spark-submit \ 
--class playground.MainClass \ 
--master yarn-cluster \ 
--num-executors 4 \ 
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \ 
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1 

lo faccio fare un tentativo e scoprire il numero di esecutori è 4, e ogni esecutore legge ed elabora i dati da kafka. Perché? Ci sono solo 2 partizioni in kafka topic, come fa 4 esecutori a leggere dall'argomento kafka, che ha solo 2 partizioni?

Di seguito sono riportati i dettagli dell'applicazione spark e dei registri.

La mia applicazione scintilla, che stampa i messaggi ricevuti (in flatMap metodo) da Kafka in ogni esecutore:

... 
    String brokers = args[0]; 
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(","))); 
    kafkaParams.put("metadata.broker.list", brokers); 

    JavaPairInputDStream<String, String> messages = 
     KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, 
      kafkaParams, topicsSet); 

    JavaPairDStream<String, Integer> wordCounts = 
     messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() 
     { 
      public Iterable<String> call(Tuple2<String, String> tuple) throws Exception 
      { 
       System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(), 
        tuple._2())); // print the kafka message received in executor 
       return Arrays.asList(SPACE.split(tuple._2())); 
      } 

     }).mapToPair(new PairFunction<String, String, Integer>() 
     { 
      public Tuple2<String, Integer> call(String word) throws Exception 
      { 
       System.out.println(String.format("[word]: %s", word)); 
       return new Tuple2<String, Integer>(word, 1); 
      } 

     }).reduceByKey(new Function2<Integer, Integer, Integer>() 
     { 
      public Integer call(Integer v1, Integer v2) throws Exception 
      { 
       return v1 + v2; 
      } 

     }); 

    wordCounts.print(); 

    Runtime.getRuntime().addShutdownHook(new Thread(){ 
     @Override 
     public void run(){ 
      System.out.println("gracefully shutdown Spark!"); 
      jssc.stop(true, true); 
     } 
    }); 
    jssc.start(); 
    jssc.awaitTermination(); 

mio Kafka argomento, con 2 partizioni. String "ciao ciao parola 1", "ciao ciao parola 2", "ciao ciao parola 3", ... vengono inviati all'argomento.

Topic: topic_2 PartitionCount:2 ReplicationFactor:2 Configs: 
Topic: topic_2 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 
Topic: topic_2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 

Webconsle: uscita enter image description here

console di esecutore 1: uscita

... 
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12 
[word]: hello 
[word]: hello 
[word]: world 
[word]: 12 
... 

console di esecutore 2:

... 
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2 
[word]: hello 
[word]: hello 
[word]: world 
[word]: 2 
... 

output della console di esecutore 3:

... 
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3 
[word]: hello 
[word]: hello 
[word]: world 
[word]: 3 
... 
+0

Stampo il numero di partizioni in ciascun RDD. Ha lo stesso valore dei numeri di partizione di kafka topic, che è 2 nel mio caso. In che modo 3 esecutori possono elaborare in parallelo una serie di RDD, che hanno in totale due partizioni? In base all'output della console di ciascun executor, tutti gli executors elaborano i dati da RDD. – yzandrew

+0

Dato che DStream è una serie di RDD, forse per alcune finestre temporali, gli RDD vengono elaborati in 2 dei 3 esecutori.E in un altro momento Windows, gli RDD vengono elaborati in altri 2 dei 3 esecutori? Ho ragione? – yzandrew

risposta

5

Ogni partizione è operato da un esecutore alla volta (ammesso che non si dispone di esecuzione speculativa acceso).

Se si dispone di più esecutori di quelli delle partizioni, non tutti lavoreranno su un determinato RDD. Ma come hai notato, dal momento che un DStream è una sequenza di RDD, nel tempo ogni esecutore farà del lavoro.

+0

Cosa succederà se il numero di partizioni è maggiore di un numero di esecutori? – Knight71

+0

Una volta che un esecutore ha terminato di lavorare su una partizione, ne verrà assegnato un altro. –

+0

@CodyKoeninger Ciao, ho un problema: supponiamo che ci siano 15 partizioni kafka e 15 esecutori ciascuno con 8 core, a volte (nella maggior parte dei casi funziona bene) solo 3 esecutori ricevono compiti come 3 * 8> 15. Ma mi piacerebbe fare in modo che ogni esecutore si prenda cura di una partizione di kafka. È possibile farlo? (anche partizionando il rdd al 196, gli altri esecutori non ottengono il compito, sto usando la scintilla 1.6.2) –