2015-07-19 4 views
12

Sto cercando di capire come rendere un'app Spark Streaming più Fault Tolerant (in particolare quando si tenta di scrivere su dipendenze downstream), e non so quale sia il modo migliore è quello di gestire i fallimenti nel tentativo di scrivere i risultati su una fonte esterna, come Cassandra, DynamoDB, ecc.Apache Spark Streaming, Come gestire gli errori di dipendenza downstream

Ad esempio, ho un lavoro Spark Streaming che estrae i dati da un flusso (Kafka, Flume, ecc ... I haven ' t finalizzato quale tecnologia utilizzare ancora), aggrega elementi simili insieme e quindi scrive i risultati in un negozio esterno. (cioè Cassandra, DynamoDB, o qualunque cosa stia ricevendo i risultati dei miei calcoli DStream).

Sto cercando di capire come gestisco il caso in cui la dipendenza esterna non è disponibile per la scrittura. Forse il cluster è andato giù, forse ci sono problemi di autorizzazione, ecc., Ma il mio lavoro non può scrivere i risultati sulla dipendenza esterna. C'è un modo per sospendere Spark Streaming in modo che i ricevitori non continuino a dati batch? Dovrei semplicemente dormire il batch corrente e lasciare che il ricevitore continui a memorizzare i lotti? Se il problema è transitorio (alcuni secondi), continuare a eseguire il batch può essere accettabile, ma cosa succede se la dipendenza diminuisce per alcuni minuti o più di 1 ora/e?

Un pensiero che avevo era di avere un processo di monitoraggio che controlla la salute delle dipendenze in background, e se scopre che è "malsano", fermerà il lavoro. Quindi, quando tutte le dipendenze sono in buona salute, posso avviare il processo di backup ed elaborare tutti i dati che non sono stati scritti sull'origine esterna.

Un altro pensiero che ho avuto è stato in qualche modo segnalare nel metodo DStream forEachRdd, che c'era un problema. C'è qualche eccezione che posso lanciare nel DStream che segnalerà al guidatore che dovrebbe fermarsi?

Se qualcuno ha esperienza su come gestire la tolleranza ai guasti esterni o può indirizzarmi a buoni articoli/video su di esso, sarebbe fantastico.

Grazie

+2

Non sono sicuro, ma il ricevitore/storage dati downstream dovrebbe gestire gli stessi errori? è al di là della responsabilità di Spark di preoccuparsene. Inoltre, se si verifica un errore, si tratta più di monitoraggio e avviso, in modo che gli ingegneri possano essere informati e controllare immediatamente l'errore. – keypoint

risposta

1

Credo non ci sia una risposta semplice e universale qui. Molto dipende dalla semantica delle applicazioni, dal tipo di fonti di dati (ricevitore affidabile, ricevitore affidabile, basato su file, ricevitore-meno) e requisiti.

In generale, non si dovrebbe mai consentire l'errore dell'applicazione su un singolo errore IO. Supponendo di avere una certa azione:

outputAction[T](rdd: RDD[T]): Unit = ??? 

almeno fare in modo che non si propaga un'eccezione al guidatore.

outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ??? 

stream foreachRDD { rdd => Try(outputAction(rdd)) } 

La domanda rimane la prossima. La cosa più semplice che puoi fare è eliminare la finestra data. A seconda dell'applicazione può essere una soluzione accettabile o meno, ma in generale ci sono molti casi in cui perdere alcuni dati è perfettamente accettabile.

Può essere ulteriormente migliorato tenendo traccia dei guasti e intraprendendo qualche altra azione se è stata raggiunta una soglia.

Se cadere dati non è accettabile passo successivo è quello di ritentare dopo un certo ritardo:

outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ??? 

stream foreachRDD { 
    rdd => Try(outputAction(rdd)) 
    .recoverWith { case _ => Try(outputActionWithDelay(d1)(rdd)) } 
    .recoverWith { case _ => Try(outputActionWithDelay(d2)(rdd)) } 
    ... 
} 

Numero di tentativi e ritardare la durata varia da caso a caso e depnds sulla fonte e la capacità di memorizzare i dati in arrivo.

Cosa si può fare quando si preme l'ultimo tentativo? Per i principianti possiamo aggiungere una fonte di output alternativa. Invece di utilizzare la fonte primaria, è possibile, ad esempio, inviare tutto a un archivio di file esterno affidabile e preoccuparsene in seguito. Questo potrebbe non essere applicabile se la sorgente di output richiede un ordine specifico di dati in entrata, ma altrimenti dovrebbe valere la pena provare.

alternativeOutputAction[T](rdd: RDD[T]) = ??? 

stream foreachRDD { 
    rdd => Try(outputAction(rdd)) 
    .recoverWith { case _ => Try(outputActionWithDelay(d1) 
    ... 
    .recoverWith { case _ => Try(outputActionWithDelay(dn)(rdd)) } 
    .recoverWith { case _ => Try(alternativeOutputAction(rdd)) 
} 

Se fallisce è probabilmente un sintomo di gravi problemi e non c'è molto che possiamo fare a livello di applicazione. Possiamo tornare al primo approccio e sperare semplicemente che la situazione possa risolversi presto o scegliere un approccio più sofisticato.

Se la sorgente di input è in grado di memorizzare i dati e noi usiamo archiviazione e replica affidabili, possiamo enable checkpointing e chiudere semplicemente l'applicazione.

Se si tenta di ripristinarlo è probabilmente una buona idea aggiungere una variante di CircuitBreaker e se l'applicazione ha riscontrato più errori durante il tentativo di raggiungere i tentativi di recupero di uscita dell'output primario senza un ritardo.

1

Ora utilizzo lo streaming diretto e salvo gli offset da soli. Questo potrebbe non risolvere il tuo problema, almeno una volta che hai trovato alcuni problemi con la tua memoria esterna, puoi ricominciare da dove ti sei fermato.