Ho un contesto di streaming di scintille che legge i dati degli eventi da kafka a intervalli di 10 secondi. Vorrei integrare questi dati degli eventi con i dati esistenti su una tabella postgres.Spark streaming di più fonti, ricaricare il dataframe
posso caricare la tabella postgres con qualcosa di simile:
val sqlContext = new SQLContext(sc)
val data = sqlContext.load("jdbc", Map(
"url" -> url,
"dbtable" -> query))
...
val broadcasted = sc.broadcast(data.collect())
E dopo posso attraversare in questo modo:
val db = sc.parallelize(data.value)
val dataset = stream_data.transform{ rdd => rdd.leftOuterJoin(db)}
Vorrei per mantenere attivo il flusso di dati corrente e ricaricare questa tabella ogni 6 ore. Dal momento che la scintilla di apache al momento non supporta più contesti di corsa come posso realizzare questo? C'è qualche soluzione? O avrò bisogno di riavviare il server ogni volta che voglio ricaricare i dati? Questo sembra un caso di uso così semplice ...:/
Sto cercando una risposta anche a questo, hai avuto qualche successo, @ user838681? –
Quando ricarichi la tabella Postgres, ti preoccupi degli eventi passati di kafka, o stai solo cercando di unirti a nuovi dati di kafka dal momento in cui è avvenuto l'ultimo aggiornamento da Postgres? –
@HamelKothari Non è necessario aggiornare o rielaborare eventi passati di Kafka. Quando ho aggiornato la tabella SQL voglio solo usarlo su qualsiasi evento futuro da Kafka. –