Sto provando a eseguire computazioni Spark Streaming stateful su (falsi) registri del server Web Apache letti da Kafka. L'obiettivo è quello di "sincronizzare" il traffico web in modo simile a this blog postSpark Streaming groupByKey e updateStateByKey Implementazione
L'unica differenza è che voglio "eseguire la sessione" di ogni pagina su IP, invece dell'intera sessione. Sono stato in grado di fare questa lettura da un file di traffico web fasullo usando Spark in modalità batch, ma ora voglio farlo in un contesto di streaming.
file di registro vengono letti da Kafka e analizzati in K/V
coppie di (String, (String, Long, Long))
o
(IP, (requestPage, time, time))
.
Quindi chiamo groupByKey()
su questo K/V pair
. In modalità batch, ciò produrrebbe un:
(String, CollectionBuffer((String, Long, Long), ...)
o
(IP, CollectionBuffer((requestPage, time, time), ...)
in uno StreamingContext, esso produce un:
(String, ArrayBuffer((String, Long, Long), ...)
modo:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
Tuttavia, come la prossima microbatch (DStream) arriva, questa informazione viene scartata.
In definitiva quello che voglio è che lo ArrayBuffer
si riempia nel tempo in quanto un dato IP continua ad interagire e ad eseguire alcuni calcoli sui suoi dati per "sincronizzare" il tempo della pagina.
Credo che l'operatore che accada sia "updateStateByKey
". Sto avendo qualche problema con questo operatore (sono nuovo ad entrambi Spark & Scala);
qualsiasi aiuto è apprezzato.
Finora:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
b: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
}