Sto utilizzando Apache Spark Streaming 1.6.1 per scrivere un'applicazione Java che unisce due flussi di dati Key/Value e scrive l'output su HDFS. I due flussi di dati contengono stringhe K/V e vengono periodicamente ingeriti in Spark da HDFS utilizzando textFileStream().Come trasportare flussi di dati su più intervalli batch in Spark Streaming
I due flussi di dati non sono sincronizzati, il che significa che alcuni tasti che sono nello stream1 al tempo t0 possono apparire nello stream2 al tempo t1 o viceversa. Quindi, il mio obiettivo è quello di unire i due flussi e calcolare le chiavi "rimanenti", che dovrebbero essere considerate per l'operazione di join nei successivi intervalli batch.
Per chiarire meglio questo, guarda il seguente algoritmo:
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
Ho cercato di implementare questo algoritmo con Spark Streaming senza successo. Inizialmente, Creo due flussi vuote per chiavi rimanenti in questo modo (questo è solo un flusso, ma il codice per generare il secondo flusso è simile):
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
Successivamente, questo flusso vuoto è unificato (cioè, union()) con stream1 e infine, dopo il join, aggiungo le chiavi rimanenti da stream1 e call window(). Lo stesso accade con stream2.
Il problema è che le operazioni che generano left_keys_s1 e left_keys_s2 sono trasformazioni senza azioni, il che significa che Spark non crea alcun diagramma di flusso RDD e, di conseguenza, non vengono mai eseguiti. Quello che ottengo in questo momento è un join che emette solo i record le cui chiavi sono in stream1 e stream2 nello stesso intervallo di tempo.
Avete qualche suggerimento per implementarlo correttamente con Spark?
Grazie, Marco