2014-12-17 5 views
7

Sto provando a eseguire computazioni Spark Streaming stateful su (falsi) registri del server Web Apache letti da Kafka. L'obiettivo è quello di "sincronizzare" il traffico web in modo simile a this blog postSpark Streaming groupByKey e updateStateByKey Implementazione

L'unica differenza è che voglio "eseguire la sessione" di ogni pagina su IP, invece dell'intera sessione. Sono stato in grado di fare questa lettura da un file di traffico web fasullo usando Spark in modalità batch, ma ora voglio farlo in un contesto di streaming.

file di registro vengono letti da Kafka e analizzati in K/V coppie di (String, (String, Long, Long)) o

(IP, (requestPage, time, time)).

Quindi chiamo groupByKey() su questo K/V pair. In modalità batch, ciò produrrebbe un:

(String, CollectionBuffer((String, Long, Long), ...) o

(IP, CollectionBuffer((requestPage, time, time), ...)

in uno StreamingContext, esso produce un:

(String, ArrayBuffer((String, Long, Long), ...) modo:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) 

Tuttavia, come la prossima microbatch (DStream) arriva, questa informazione viene scartata.

In definitiva quello che voglio è che lo ArrayBuffer si riempia nel tempo in quanto un dato IP continua ad interagire e ad eseguire alcuni calcoli sui suoi dati per "sincronizzare" il tempo della pagina.

Credo che l'operatore che accada sia "updateStateByKey". Sto avendo qualche problema con questo operatore (sono nuovo ad entrambi Spark & Scala);

qualsiasi aiuto è apprezzato.

Finora:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
          a: Seq[(String, ArrayBuffer[(String, Long, Long)])], 
          b: Option[(String, ArrayBuffer[(String, Long, Long)])] 
         ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { 

    } 

risposta

2

risposta di Gabor mi ha iniziato la strada giusta, ma qui è una risposta che produce il previsto produzione.

In primo luogo, per l'uscita che voglio:

(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000))) 

Non ho bisogno di groupByKey(). updateStateByKey accumula già i valori in un Seq, quindi l'aggiunta di groupByKey non è necessaria (e costosa). Gli utenti di Spark suggeriscono fortemente di non usare groupByKey.

Qui è il codice che ha lavorato:

def updateValues(newValues: Seq[(String, Long, Long)], 
         currentValue: Option[Seq[ (String, Long, Long)]] 
        ): Option[Seq[(String, Long, Long)]] = { 

    Some(currentValue.getOrElse(Seq.empty) ++ newValues) 

    } 


val grouped = ipTimeStamp.updateStateByKey(updateValues) 

Qui updateStateByKey viene passata una funzione (updateValues) che ha l'accumulo di valori nel tempo (newValues) così come opzione per il valore corrente nel flusso (valore corrente). Quindi restituisce la combinazione di questi.È richiesto getOrElse poiché currentValue potrebbe essere a volte vuoto. Credito a https://twitter.com/granturing per il codice corretto.

2

Penso che siete alla ricerca di qualcosa di simile:

def updateGroupByKey(
          newValues: Seq[(String, ArrayBuffer[(String, Long, Long)])], 
          currentValue: Option[(String, ArrayBuffer[(String, Long, Long)])] 
         ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { 
    //Collect the values 
    val buffs: Seq[ArrayBuffer[(String, Long, Long)]] = (for (v <- newValues) yield v._2) 
    val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs 
    //Convert state to buffer 
    if (buffs2.isEmpty) None else { 
     val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1 
     Some((key, buffs2.foldLeft(new ArrayBuffer[(String, Long, Long)])((v, a) => v++a))) 
    } 
    }