2016-03-10 49 views
7

Stiamo utilizzando Spark Streaming connesso allo stream di AWS Kinesis per aggregare (al minuto) le metriche che stiamo ricevendo e scrivere le aggregazioni su influxdb per renderle disponibili per una dashboard in tempo reale.Funzionamento dei checkpoint del ricevitore di scintilla Kinesis

Tutto sta funzionando bene, ma ora stiamo considerando come dovremmo gestire le pause per le distribuzioni e gli eventuali guasti del sistema.

I documenti dicono che la libreria di integrazione di Kinesis è già pronta per guasti, checkpoint e così via, ma vorrei chiarire come funziona il checkpoint.

Il ricevitore Kinesis crea un ingresso DStream utilizzando la libreria Kinesis Client (KCL) fornita da Amazon sotto la Amazon Software License (ASL). Il KCL si basa sull'SDK Java AWS con licenza Apache 2.0 e fornisce bilanciamento del carico, tolleranza agli errori, checkpoint attraverso i concetti di Worker, Checkpoint e Shard Leases.

Possiamo definire l'intervallo di checkpoint per kinesis, ma per quanto ho capito è usato solo per contrassegnare fino a che punto del flusso abbiamo consumato le metriche. Quindi, abbiamo ancora bisogno di usare la funzione di checkpoint dallo streaming spark, giusto?

Mentre stiamo aggregando i dati al minuto, il nostro intervallo di batch è 60 secondi ma durante questi 60 secondi riceviamo continuamente dati dal flusso.

Ecco le mie domande:

  • Quando eseguo JavaStreamingContext.stop (...) (al fine di distribuire una nuova versione del lavoro), il ricevitore verrà arrestato e sarà il punto di controllo essere aggiornato alla fine?
  • Quando si verificherà il checkpoint dello scintillio? Dopo ogni esecuzione del lavoro? Prima?
  • Supponendo che entrambi i checkpoint funzionino, come possiamo garantire la coerenza in caso di errore? Sembra che ogni volta che il checkpoint di streaming sta accadendo, ha bisogno di checkpoint per kinesis allo stesso tempo, altrimenti potremo terminare di leggere gli stessi dati di nuovo. Come possiamo gestire questo?
  • Se il servizio sottostante (in questo caso influxdb) non è attivo, cosa devo fare? Attuare un meccanismo di ripetizione? Se è così, deve smettere di riprovare dopo un po ', altrimenti avremo esaurito la memoria.

Grazie in anticipo!

risposta

0

Non sicuro al cento per cento che quella sarebbe una risposta completa alla tua domanda poiché la soluzione di checkpoint è una componente piuttosto complicata e ogni sottoquotazione potrebbe richiedere una domanda separata in SO. Eppure, forse questo avrebbe dato qualche indizio sul processo:

  • checkpoint lavora a livello DSTREAM, in modo che significa che è possibile eseguire posti di blocco sulle diverse fasi della pipeline. Può essere il punto in cui Spark crea il tuo primo RDD dai blocchi generati dal ricevitore o può essere il tuo RDD trasformato che puoi ottenere nelle fasi successive dopo aver calcolato le tue metriche.Quindi quando chiami stop (se lo interrompi con garbo) avrai lo stato del tuo checkpoint con l'ultimo RDD processato dopo che i tuoi ricevitori sono stati fermati nel punto che hai scelto nella tua pipeline

  • il checkpoint è attivato da Componente Spark chiamato JobGenerator. Prima di eseguire il lavoro genererà DStreams che calcoleranno gli RDD. In tale fase, se il checkpointing è configurato, ogni RDD di quel DStream creerà inoltre i metadati del checkpoint e l'RDD verrà contrassegnato come uno che richiede il checkpoint. Quindi SparkContext eseguirà i lavori generati e alla fine chiamerà il metodo doCheckpoint che manterrà i dati del checkpoint nella posizione configurata. JobGenerator creerà un lavoro separato per questo, quindi ti aspetterai una certa latenza tra il lavoro effettivo e la persistenza del punto di controllo

  • ogni volta che Spark eseguirà l'applicazione, creerà il contesto di streaming dai dati del punto di controllo. Quindi diciamo che se hai le tue metriche allo stato 7 per esempio sull'ultimo Spark spento dopo che i tuoi ricevitori Kenesis sono stati fermati, allora quando il tuo contesto di streaming sarà ripristinato, sarà di nuovo nello stato 7 e solo il prossimo batch generato dai nuovi dati di kenesis lo posizioneremo allo stato 8

  • bene, questo è il modo in cui architetti il ​​tuo prodotto. Probabilmente ha senso fare il checkpoint solo dopo che i tuoi dati sono stati gestiti con successo dalla tua dipendenza (della causa suggerirei di applicare un meccanismo di tentativi per evitare problemi di connettività a breve termine). Ma questo è troppo poco informazioni per darti una risposta completa su quello