2015-09-17 11 views
5

Uso lo streaming di Spark per contare utenti unici. Io uso updateStateByKey, quindi ho bisogno di configurare una directory di checkpoint. Ho anche caricare i dati da posto di blocco mentre si avvia l'applicazione, come the example in the doc:Come configare il punto di controllo per ridistribuire l'applicazione spark streaming?

// Function to create and setup a new StreamingContext 
def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

Ecco la domanda, se il mio codice viene cambiato, quindi ho ri-distribuire il codice, sarà il punto di controllo da caricare, non importa quanto il codice è cambiato? O ho bisogno di usare la mia logica per perseverare i miei dati e caricarli nella prossima esecuzione.

Se utilizzo la mia logica per salvare e caricare DStream, se l'applicazione si riavvia in caso di errore, i dati non verranno caricati sia dalla directory checkpoint sia dal mio database?

risposta

3

Il punto di controllo include i metadati, rdd, dag e anche la logica. Se si modifica la logica e si tenta di eseguirla dall'ultimo checkpoint, è molto probabile che si verifichi un'eccezione. Se si desidera utilizzare la propria logica per salvare i dati da qualche parte come punto di controllo, potrebbe essere necessario implementare un'azione scintilla per inviare i dati del checkpoint a qualsiasi database, nell'esecuzione successiva, caricare i dati del checkpoint come un RDD iniziale (nel caso stai usando l'API updateStateByKey) e continui la tua logica.

2

Ho posto questa domanda nell'elenco Spark mail e ho ottenuto una risposta, l'ho analizzata su my blog. Pubblicherò qui il riepilogo:

Il modo è utilizzare sia il checkpoint che il nostro meccanismo di caricamento dei dati. Ma carichiamo i nostri dati come initalRDD di updateStateByKey. Quindi, in entrambe le situazioni, i dati saranno né perso né duplicate:

  1. Quando cambiamo il codice e ridistribuire l'applicazione Spark, abbiamo l'arresto la vecchia applicazione Spark con grazia e la pulizia dei dati del punto di controllo, quindi l'unico dati caricati è i dati che abbiamo salvato.

  2. Quando l'applicazione Spark è in errore e si riavvia, carica i dati dal punto di controllo. Ma il passaggio di DAG viene salvato in modo che non carichi nuovamente i nostri dati come initalRDD. Quindi gli unici dati caricati sono i dati del checkpoint.