2016-02-25 11 views
10

Perché è l'eccezione inPerché Akka Streams ingoia le mie eccezioni?

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Source 

object TestExceptionHandling { 
    def main(args: Array[String]): Unit = { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer()(defaultActorSystem) 

    Source(List(1, 2, 3)).map { i => 
     if (i == 2) { 
     throw new RuntimeException("Please, don't swallow me!") 
     } else { 
     i 
     } 
    }.runForeach { i => 
     println(s"Received $i") 
    } 
    } 
} 

ignorato silenziosamente? Vedo che lo stream si interrompe dopo aver stampato Received 1, ma non viene registrato nulla. Si noti che il problema non è la configurazione di registrazione in generale, poiché vedo un sacco di output se imposto akka.log-config-on-start = on nel mio file application.conf.

+1

Stai buttando via l'eccezione in quanto si ignora il valore di ritorno di 'runForeach'. –

+0

@ViktorKlang grazie per avermelo fatto notare, ho appena aggiornato la mia risposta! –

risposta

11

Ora sto utilizzando una consuetudine Supervision.Decider che consente di verificare le eccezioni sono correttamente registrati, che può essere impostato in questo modo:

val decider: Supervision.Decider = { e => 
    logger.error("Unhandled exception in stream", e) 
    Supervision.Stop 
} 

implicit val actorSystem = ActorSystem() 
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider) 
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem) 

Inoltre, come è stato sottolineato da Vikor Klang, nell'esempio di cui sopra , l'eccezione potrebbe anche essere "catturato" tramite

Source(List(1, 2, 3)).map { i => 
    if (i == 2) { 
    throw new RuntimeException("Please, don't swallow me!") 
    } else { 
    i 
    } 
}.runForeach { i => 
    println(s"Received $i") 
}.onComplete { 
    case Success(_) => 
    println("Done") 
    case Failure(e) => 
    println(s"Failed with $e") 
} 

Nota tuttavia, che questo approccio non vi aiuterà con

Source(List(1, 2, 3)).map { i => 
    if (i == 2) { 
    throw new RuntimeException("Please, don't swallow me!") 
    } else { 
    i 
    } 
}.to(Sink.foreach { i => 
    println(s"Received $i") 
}).run() 

dal run() restituisce Unit.

+1

'run()' restituisce solo 'Unità' perché mantiene il valore maturato del lato" sinistro "(Keep.left) per impostazione predefinita. se tu avessi usato: toMat (Sink.foreach (...)) (Keep.right) allora funzionerebbe di nuovo. –

4

Ho avuto domande simili quando ho iniziato a usare akk-stream. Supervision.Decider aiuta ma non sempre.

Sfortunatamente non rileva eccezioni generate in ActionPublisher. Lo vedo gestito, ActorPublisher.onError viene chiamato ma non raggiunge Supervision.Decider. Funziona con il semplice flusso fornito nella documentazione.

Gli errori non raggiungono anche l'attore se utilizzo Sink.actorRef.

E per il bene di esperimento ho cercato seguente esempio

val stream = Source(0 to 5).map(100/_) 
stream.runWith(Sink.actorSubscriber(props)) 

In questo caso eccezione è stato catturato da Decider ma mai raggiunto attore abbonato.

Nel complesso, penso che sia un comportamento incoerente. Non riesco a utilizzare un meccanismo per gestire gli errori in Stream.

La mia domanda così originale: Custom Supervision.Decider doesn't catch exception produced by ActorPublisher

E qui è Akka problema in cui è tracciato: https://github.com/akka/akka/issues/18359