2015-04-22 17 views
7

Sto chiamando un attore che utilizza il modello ask all'interno di un'applicazione Spray e restituisce il risultato come risposta HTTP. Mappo i fallimenti dall'attore a un codice di errore personalizzato.Risoluzione del futuro Akka da chiedere in caso di guasto

val authActor = context.actorOf(Props[AuthenticationActor]) 

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user => 
    complete(StatusCodes.OK, user) 
} 

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = { 
onComplete(f) { 
    case Success(value: T) => cb(value) 
    case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage) 
    case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.") 
    //In reality this returns a custom error object. 
} 
} 

questo funziona correttamente quando l'authActor invia un fallimento, ma se l'authActor genera un'eccezione, non succede nulla fino a quando il timeout chiedere completa. Per esempio:

override def receive: Receive = { 
    case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token") 
} 

So che i documenti dicono che Akka

Per completare il futuro con un'eccezione è necessario inviare un messaggio di mancata al mittente. Questo non viene eseguito automaticamente quando un attore genera un'eccezione durante l'elaborazione di un messaggio.

ma dato che io uso richiede un sacco di interfaccia tra gli attori di routing Spray e gli attori di servizio, preferirei non avvolgere la parte di ricezione di ogni attore bambino con un try/catch. Esiste un modo migliore per ottenere la gestione automatica delle eccezioni negli attori figli e risolvere immediatamente il futuro in caso di eccezione?

Modifica: questa è la mia soluzione corrente. Tuttavia, è abbastanza complicato farlo per ogni attore bambino.

override def receive: Receive = { 
case default => 
    try { 
    default match { 
     case _ => throw new ServiceException("")//Actual code would go here 
    } 
    } 
    catch { 
    case se: ServiceException => 
     logger.error("Service error raised:", se) 
     sender ! Failure(se) 
    case ex: Exception => 
     sender ! Failure(ex) 
     throw ex 
    } 
} 

In questo modo se si tratta di un errore previsto (cioè ServiceException), è gestita creando un guasto. Se è inatteso, restituisce immediatamente un errore in modo che il futuro venga risolto, ma genera l'eccezione in modo che possa essere gestito da SupervisorStrategy.

+0

Bene ... inviare un messaggio di errore prima di lanciare l'eccezione. –

+0

Questo è quello che non volevo fare - ho detto ** Preferirei non avvolgere la parte di ricezione di ogni attore bambino con un try/catch **. Questo è un esempio di giocattolo, è perfettamente possibile che io non controlli dove viene lanciata l'eccezione. –

+0

Beh ... sai ... una delle strade fondamentali dei sistemi distribuiti resilienti è quella di "rendere gli errori espliciti". Pensa ai vari errori che possono accadere ... rendili espliciti. Se puoi avere errori "TypeSafe" ... ancora meglio. –

risposta

13

Se si desidera un modo per fornire invio automatico di una risposta al mittente in caso di un'eccezione imprevista, quindi qualcosa di simile potrebbe funzionare per voi:

trait FailurePropatingActor extends Actor{ 
    override def preRestart(reason:Throwable, message:Option[Any]){ 
    super.preRestart(reason, message) 
    sender() ! Status.Failure(reason) 
    } 
} 

sovrascriviamo preRestart e propagare il fallimento torna al mittente come Status.Failure che causerà un errore Future a monte in errore. Inoltre, è importante chiamare super.preRestart qui perché è lì che si verifica l'arresto figlio. L'utilizzo di questo in un attore simile a questa:

case class GetElement(list:List[Int], index:Int) 
class MySimpleActor extends FailurePropatingActor { 
    def receive = { 
    case GetElement(list, i) => 
     val result = list(i) 
     sender() ! result 
    } 
} 

Se dovessi chiamare un'istanza di questo attore in questo modo:

import akka.pattern.ask 
import concurrent.duration._ 

val system = ActorSystem("test") 
import system.dispatcher 
implicit val timeout = Timeout(2 seconds) 
val ref = system.actorOf(Props[MySimpleActor]) 
val fut = ref ? GetElement(List(1,2,3), 6) 

fut onComplete{ 
    case util.Success(result) => 
    println(s"success: $result") 

    case util.Failure(ex) => 
    println(s"FAIL: ${ex.getMessage}") 
    ex.printStackTrace()  
}  

allora sarebbe corretto colpire il mio Failure blocco. Ora, il codice in quel tratto base funziona bene quando Futures non sono coinvolti nell'attore che sta estendendo quel tratto, come il semplice attore qui. Ma se si utilizza Future s, è necessario fare attenzione poiché le eccezioni che si verificano nello Future non causano riavvii nell'attore e inoltre, in preRestart, la chiamata a sender() non restituirà il riferimento corretto perché l'attore si è già spostato in il prossimo messaggio. Un attore come questo dimostra che problema:

class MyBadFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 

    def receive = { 
    case GetElement(list, i) => 
     val orig = sender() 
     val fut = Future{ 
     val result = list(i) 
     orig ! result 
     }  
    } 
} 

Se dovessimo usare questo attore nel codice di test precedente, ci sarebbe sempre ottenere un timeout nella situazione fallimento.Per mitigare questo, è necessario convogliare i risultati di futuri al mittente in questo modo:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut pipeTo sender() 
    } 
} 

In questo caso particolare, l'attore stesso non viene riavviato perché non ha incontrato un'eccezione non rilevata. Ora, se il vostro attore aveva bisogno di fare un po 'di elaborazione addizionale dopo il futuro, è possibile tubo di nuovo a sé e in modo esplicito non riuscire quando si ottiene un Status.Failure:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2 

    case Status.Failure(ex) => 
     throw ex 
    } 
} 

Se questo comportamento diventa comune, è possibile renderlo disponibile a qualunque gli attori ne hanno bisogno:

trait StatusFailureHandling{ me:Actor => 
    def failureHandling:Receive = { 
    case Status.Failure(ex) => 
     throw ex  
    } 
} 

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = myReceive orElse failureHandling 

    def myReceive:Receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2   
    } 
} 
+1

Questo sembra buono, grazie! –