12

Ho un contesto di streaming di scintille che legge i dati degli eventi da kafka a intervalli di 10 secondi. Vorrei integrare questi dati degli eventi con i dati esistenti su una tabella postgres.Spark streaming di più fonti, ricaricare il dataframe

posso caricare la tabella postgres con qualcosa di simile:

val sqlContext = new SQLContext(sc) 
val data = sqlContext.load("jdbc", Map(
    "url" -> url, 
    "dbtable" -> query)) 

...

val broadcasted = sc.broadcast(data.collect()) 

E dopo posso attraversare in questo modo:

val db = sc.parallelize(data.value) 
val dataset = stream_data.transform{ rdd => rdd.leftOuterJoin(db)} 

Vorrei per mantenere attivo il flusso di dati corrente e ricaricare questa tabella ogni 6 ore. Dal momento che la scintilla di apache al momento non supporta più contesti di corsa come posso realizzare questo? C'è qualche soluzione? O avrò bisogno di riavviare il server ogni volta che voglio ricaricare i dati? Questo sembra un caso di uso così semplice ...:/

+0

Sto cercando una risposta anche a questo, hai avuto qualche successo, @ user838681? –

+0

Quando ricarichi la tabella Postgres, ti preoccupi degli eventi passati di kafka, o stai solo cercando di unirti a nuovi dati di kafka dal momento in cui è avvenuto l'ultimo aggiornamento da Postgres? –

+0

@HamelKothari Non è necessario aggiornare o rielaborare eventi passati di Kafka. Quando ho aggiornato la tabella SQL voglio solo usarlo su qualsiasi evento futuro da Kafka. –

risposta

1

A mio modesto parere, il ricaricamento di un'altra origine dati durante le trasformazioni su DStreams non è raccomandato dalla progettazione.

Rispetto ai modelli tradizionali di lavorazione stateful di streaming, D-Streams è progettato per strutturare un calcolo di streaming come una serie di stateless, deterministic calcoli lotti su intervalli di tempo piccole.

Le trasformazioni su DStreams sono deterministiche e questo design consente il ripristino rapido da guasti mediante ricalcolo. L'aggiornamento porterà effetti collaterali al ripristino/ricomposizione.

Una soluzione è di posticipare la query alle operazioni di output, ad esempio: foreachRDD(func).