Sto cercando di utilizzare l'approccio di Spark Kafka Direct Stream. Semplifica il parallelismo creando il maggior numero di partizioni RDD come partizione topic di kafka, come indicato in questo doc. Sulla base delle mie conoscenze, spark creerà un esecutore per ogni partizione RDD per eseguire il calcolo.Spark Kafka Direct DStream - Quanti esecutori e partizioni RDD nella modalità cluster-cluster se sono impostati i num-executor?
Così, quando invio l'applicazione in modalità cluster e specifichi l'opzione num-executors in un valore diverso rispetto al numero di partizioni, quanti esecutori ci saranno?
Ad esempio, v'è un argomento Kafka con 2 partizioni, e io specificare num-esecutori a 4:
export YARN_CONF_DIR=$HADOOP_HOME/client_conf
./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1
lo faccio fare un tentativo e scoprire il numero di esecutori è 4, e ogni esecutore legge ed elabora i dati da kafka. Perché? Ci sono solo 2 partizioni in kafka topic, come fa 4 esecutori a leggere dall'argomento kafka, che ha solo 2 partizioni?
Di seguito sono riportati i dettagli dell'applicazione spark e dei registri.
La mia applicazione scintilla, che stampa i messaggi ricevuti (in flatMap metodo) da Kafka in ogni esecutore:
...
String brokers = args[0];
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);
JavaPairDStream<String, Integer> wordCounts =
messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
{
public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
{
System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(),
tuple._2())); // print the kafka message received in executor
return Arrays.asList(SPACE.split(tuple._2()));
}
}).mapToPair(new PairFunction<String, String, Integer>()
{
public Tuple2<String, Integer> call(String word) throws Exception
{
System.out.println(String.format("[word]: %s", word));
return new Tuple2<String, Integer>(word, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>()
{
public Integer call(Integer v1, Integer v2) throws Exception
{
return v1 + v2;
}
});
wordCounts.print();
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run(){
System.out.println("gracefully shutdown Spark!");
jssc.stop(true, true);
}
});
jssc.start();
jssc.awaitTermination();
mio Kafka argomento, con 2 partizioni. String "ciao ciao parola 1", "ciao ciao parola 2", "ciao ciao parola 3", ... vengono inviati all'argomento.
Topic: topic_2 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: topic_2 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: topic_2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Webconsle: uscita
console di esecutore 1: uscita
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12
[word]: hello
[word]: hello
[word]: world
[word]: 12
...
console di esecutore 2:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2
[word]: hello
[word]: hello
[word]: world
[word]: 2
...
output della console di esecutore 3:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3
[word]: hello
[word]: hello
[word]: world
[word]: 3
...
Stampo il numero di partizioni in ciascun RDD. Ha lo stesso valore dei numeri di partizione di kafka topic, che è 2 nel mio caso. In che modo 3 esecutori possono elaborare in parallelo una serie di RDD, che hanno in totale due partizioni? In base all'output della console di ciascun executor, tutti gli executors elaborano i dati da RDD. – yzandrew
Dato che DStream è una serie di RDD, forse per alcune finestre temporali, gli RDD vengono elaborati in 2 dei 3 esecutori.E in un altro momento Windows, gli RDD vengono elaborati in altri 2 dei 3 esecutori? Ho ragione? – yzandrew