2016-05-20 22 views
5

Sto utilizzando Apache Spark Streaming 1.6.1 per scrivere un'applicazione Java che unisce due flussi di dati Key/Value e scrive l'output su HDFS. I due flussi di dati contengono stringhe K/V e vengono periodicamente ingeriti in Spark da HDFS utilizzando textFileStream().Come trasportare flussi di dati su più intervalli batch in Spark Streaming

I due flussi di dati non sono sincronizzati, il che significa che alcuni tasti che sono nello stream1 al tempo t0 possono apparire nello stream2 al tempo t1 o viceversa. Quindi, il mio obiettivo è quello di unire i due flussi e calcolare le chiavi "rimanenti", che dovrebbero essere considerate per l'operazione di join nei successivi intervalli batch.

Per chiarire meglio questo, guarda il seguente algoritmo:

variables: 
stream1 = <String, String> input stream at time t1 
stream2 = <String, String> input stream at time t1 
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0 
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0 

operations at time t1: 
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2) 
write out_stream to HDFS 
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2) 
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2) 

Ho cercato di implementare questo algoritmo con Spark Streaming senza successo. Inizialmente, Creo due flussi vuote per chiavi rimanenti in questo modo (questo è solo un flusso, ma il codice per generare il secondo flusso è simile):

JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context 
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>(); 
q.add(empty_rdd); 
JavaDStream<String> empty_dstream = jssc.queueStream(q); 
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() { 
           @Override 
           public scala.Tuple2<String, String> call(String s) { 
            return new scala.Tuple2(s, s); 
           } 
           }); 

Successivamente, questo flusso vuoto è unificato (cioè, union()) con stream1 e infine, dopo il join, aggiungo le chiavi rimanenti da stream1 e call window(). Lo stesso accade con stream2.

Il problema è che le operazioni che generano left_keys_s1 e left_keys_s2 sono trasformazioni senza azioni, il che significa che Spark non crea alcun diagramma di flusso RDD e, di conseguenza, non vengono mai eseguiti. Quello che ottengo in questo momento è un join che emette solo i record le cui chiavi sono in stream1 e stream2 nello stesso intervallo di tempo.

Avete qualche suggerimento per implementarlo correttamente con Spark?

Grazie, Marco

risposta

1

Deve essere possibile riporto valori da un lotto all'altro mantenendo un riferimento ad un RDD dove si svolgono noi tali valori.

Non tentare di unire gli stream utilizzando queueDStream, dichiarare invece un riferimento RDD mutabile che può essere aggiornato ad ogni intervallo di streaming.

Questo è un esempio:

In questo lavoro streaming, si parte con un RDD carring 100 interi. Ogni intervallo, i numeri casuali 10 vengono generati e sottratti per quei 100 interi iniziali. Questo processo continua fino a quando l'RDD iniziale con 100 elementi è vuoto. Questo esempio mostra come trasferire elementi da un intervallo all'altro.

import scala.util.Random 
    import org.apache.spark.streaming.dstream._ 

    val ssc = new StreamingContext(sparkContext, Seconds(2)) 

    var targetInts:RDD[Int] = sc.parallelize(0 until 100) 

    var loops = 0 

    // we create an rdd of functions that generate random data. 
    // evaluating this RDD at each interval will generate new random data points. 
    val randomDataRdd = sc.parallelize(1 to 10).map(_ =>() => Random.nextInt(100)) 

    val dstream = new ConstantInputDStream(ssc, randomDataRdd) 

    // create values from the random func rdd 

    dataDStream.foreachRDD{rdd => 
         loops += 1 
         targetInts = targetInts.subtract(rdd) 
         if (targetInts.isEmpty) {println(loops); ssc.stop(false)} 
         } 


    ssc.start() 

L'esecuzione di questo esempio e tramando contro loopstargetInts.count ha pronunciato la seguente tabella:

Removing 100 ints by generating random numbers

Spero che questo ti dà abbastanza una guida per implementare il caso d'uso completa.