2015-10-21 4 views
7

Sto lavorando a un'applicazione Spark Streaming basata su Java che risponde ai messaggi che arrivano attraverso un argomento di Kafka. Per ogni messaggio, l'applicazione esegue un po 'di elaborazione e scrive i risultati su un altro argomento di Kafka.Gestione eccezione non archiviata in Spark

A volte a causa di problemi imprevisti relativi ai dati, il codice che opera sugli RDD potrebbe non riuscire e generare un'eccezione. Quando ciò accade, mi piacerebbe avere un gestore generico che possa intraprendere le azioni necessarie e rilasciare un messaggio su un argomento di errore. Al momento, queste eccezioni sono scritte nel log di Spark da Spark stessa.

Qual è l'approccio migliore per farlo, invece di scrivere blocchi try-catch per ogni blocco di codice che funziona sugli RDD?

+0

Vedo che qualcuno ha colato un voto vicino dicendo questa domanda è opinione-based. Sarei grato se gli esperti potessero almeno fare un po 'di luce prima di esprimere voti ravvicinati se questo non fosse possibile con Spark al momento. Scegliere un voto ravvicinato senza una spiegazione non aiuta in alcun modo la comunità. –

+0

Si potrebbe scrivere una funzione generica che faccia questo. Hai solo bisogno di avvolgerlo attorno alle azioni RDD poiché quelle sono le uniche che possono lanciare eccezioni Spark (i trasformatori come .map e .filter sono pigri eseguiti da azioni). (Supponendo che questo sia in Scala) Potresti anche provare qualcosa con impliciti e un errore nella gestione della classe RDD arricchita che crei per imporre implicitamente il tuo errorhandling con il solo tipo di firme. Non ho votato a ruota libera, ma immagino che l'approccio "migliore" sia in qualche modo soggettivo rispetto alle esigenze applicative. – Rich

+0

Grazie a @Rich. Quindi, in pratica, quello che intendi dire è che in Spark ora non c'è modo per gestirlo, quindi ogni applicazione dovrebbe prendersene cura. Se potessi pubblicare il tuo commento come risposta, lo accetterò. –

risposta

3

È possibile scrivere una funzione generica che esegue questa operazione. È necessario solo avvolgerlo attorno alle azioni RDD poiché quelle sono le uniche che possono generare eccezioni Spark (i trasformatori come .map e .filter sono pigri eseguiti da azioni).

(Supponendo che questo sia in Scala) Potresti forse anche provare qualcosa con impliciti. Crea una classe che contiene un RDD e gestisce l'errore. Ecco uno schizzo di quello che potrebbe apparire come:

implicit class FailSafeRDD[T](rdd: RDD[T]) { 
    def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try { 
    fn(rdd) 
    } 
} 

Si potrebbe aggiungere errore argomento di messaggistica in failsafeAction o qualsiasi cosa si vuole fare ogni volta in caso di fallimento. E poi l'uso potrebbe essere il seguente:

val rdd = ??? // Some rdd you already have 
val resultOrException = rdd.failsafeAction { r => r.count() } 

Oltre a questo, immagino che l'approccio "migliore" sia in qualche modo soggettivo alle esigenze applicative.

2

Penso che si potrebbe anche implementare questo con un tentativo di cattura =>

dstream.foreachRDD { case rdd: RDD[String] => 
    rdd.foreach { case string: String => 
     try { 
     val kafkaProducer = ... 
     val msg = ... 
     kafkaProducer.send(msg) 
     } catch { 
     case d: DataException=> 
      val kafkaErrorProducer = ... 
      val errorMsg = ... 
      kafkaErrorProducer.send(errorMsg) 
     case t: Throwable => 
      //further error handling 
     } 
    } 
}