2011-08-16 2 views
8

Sto tentando di ottenere un comportamento tollerante ai guasti negli attori Akka. Sto lavorando su un codice che dipende dal fatto che gli attori nel sistema siano disponibili per un lungo periodo di elaborazione. Sto scoprendo che la mia elaborazione si interrompe dopo un paio d'ore (ci dovrebbero volere circa 10 ore) e non succede molto. Credo che i miei attori non si stiano riprendendo dalle eccezioni.Come si imposta la tolleranza d'errore di akka Actor?

Cosa è necessario fare per riavviare permanentemente gli attori su base uno su uno? Mi aspetto che questo può essere fatto da questa documentazione http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

Sto lavorando con Akka 1.1.3 e Scala 2,9

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached 
import akka.dispatch.Dispatchers 
import akka.routing.CyclicIterator 
import akka.routing.LoadBalancer 
import akka.config.Supervision._ 


object TestActor { 
    val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") 
        .setCorePoolSize(100) 
        .setMaxPoolSize(100) 
        .build 
} 

class TestActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.dispatcher = TestActor.dispatcher 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure") 
     println("Actor: " + name + " Received: " + num) 
     //Thread.sleep(100) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    //callback method for restart handling 
    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    //callback method for restart handling 
    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

trait CyclicLoadBalancing extends LoadBalancer { this: Actor => 
    val testActors: List[ActorRef] 
    val seq = new CyclicIterator[ActorRef](testActors) 
} 

trait TestActorManager extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000) 
    val testActors: List[ActorRef] 
    override def preStart = testActors foreach { self.startLink(_) } 
    override def postStop = { System.out.println("postStop") } 
} 


    object FaultTest { 
    def main(args : Array[String]) : Unit = { 
     println("starting FaultTest.main()") 
     val numOfActors = 5 
     val supervisor = actorOf(
     new TestActorManager with CyclicLoadBalancing { 
      val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i))); 
     } 
    ) 

     supervisor.start(); 

     println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) 

     val testActor = Actor.registry.actorsFor(classOf[TestActor]).head 

     (1 until 200 toList) foreach { testActor ! _ } 

    } 
    } 

Questo codice imposta 5 attori dietro un LoadBalancer che appena stampare interi che sono inviato a loro eccetto che lanciano Eccezioni su numeri pari per simulare i guasti. Gli interi da 0 a 200 vengono inviati a questi attori. Mi aspetto che i numeri dispari vengano pubblicati, ma tutto sembra chiuso dopo un paio di errori sui numeri pari. L'esecuzione di questo codice con i risultati SBT in questo output:

[info] Running FaultTest 
starting FaultTest.main() 
Loading config [akka.conf] from the application classpath. 
Number of Actors: 5 
Actor: 2 Received: 1 
Actor: 2 Received: 9 
Actor: 1 Received: 3 
Actor: 3 Received: 7 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM 

Quello che penso che sta accadendo qui è che 5 gli attori iniziano, e le prime 5 numeri pari metterli fuori dal mercato e non sono sempre riavviati.

Come è possibile modificare questo codice in modo che gli attori recuperino dalle eccezioni?

Mi aspetto che questo in realtà stampi tutti i numeri dispari da 1 a 200. Penso che ogni attore potrebbe fallire sui numeri pari ma essere riavviato con una cassetta postale intatta sulle eccezioni. Mi aspetto di vedere println da preRestart e postRestart. Cosa deve essere configurato in questo esempio di codice per ottenere che queste cose accadano?

Ecco alcune ipotesi aggiuntive su akka e attori che potrebbero portare al mio fraintendimento. Suppongo che un attore possa essere configurato con un supervisore o un faultkeeper, in modo che venga riavviato e che continui a essere disponibile quando viene generata un'eccezione durante la ricezione. Suppongo che il messaggio che è stato inviato all'attore verrà perso se genera un'eccezione durante la ricezione. Suppongo che vengano chiamati il ​​preRestart() e il postRestart() sull'attore che lancia l'eccezione.

L'esempio di codice rappresenta quello che sto cercando di fare e si basa su Why is my Dispatching on Actors scaled down in Akka?

** Un altro codice di esempio **

Ecco un altro esempio di codice che è più semplice. Sto iniziando un attore che genera eccezioni su numeri pari. Non c'è nessun bilanciamento del carico o altra roba nel modo. Sto tentando di stampare informazioni sull'attore. Sto aspettando di uscire dal programma per un minuto dopo che i messaggi sono stati inviati all'attore e monitorare ciò che sta accadendo.

Mi aspetto che questo stampi i numeri dispari ma sembra che l'attore sieda con i messaggi nella sua casella di posta.

Ho sbagliato l'impostazione OneForOneStrategy? Devo collegare l'attore a qualcosa? Da parte mia, questo tipo di configurazione è fondamentalmente errata? Un Dispatcher deve essere configurato con tolleranza d'errore come? Potrei rovinare i thread nel Dispatcher?

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.ActorRegistry 
import akka.config.Supervision._ 

class SingleActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000) 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure, where does this get logged?") 
     println("Actor: " + name + " Received: " + num) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

object TestSingleActor{ 

    def main(args : Array[String]) : Unit = { 
     println("starting TestSingleActor.main()") 

     val testActor = Actor.actorOf(new SingleActor(1)).start() 

     println("number of actors: " + registry.actors.size) 
     printAllActorsInfo 

     (1 until 20 toList) foreach { testActor ! _ } 

     for(i <- 1 until 120){ 
     Thread.sleep(500) 
     printAllActorsInfo 
     } 
    } 

    def printAllActorsInfo() ={ 
    registry.actors.foreach((a) => 
     println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b " 
       .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted))) 
    } 
} 

sto ottenendo in uscita come:

[info] Running TestSingleActor 
starting TestSingleActor.main() 
Loading config [akka.conf] from the application classpath. 
number of actors: 1 
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1 
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ... 

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM 

risposta

5

Il problema era che ero con il mio file akka.conf. Stavo usando il file akka.conf di riferimento 1.1.3 ad eccezione della linea che configurava i gestori di eventi.

miniera (quello rotto):

event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

di riferimento 1.1.3 (quella che funziona):

event-handlers = ["akka.event.EventHandler$DefaultListener"] 

Con i miei evento gestori di linea di configurazione, riavvio Attore non accadono. Con la linea di riferimento 1.1.3 si riavvia meravigliosamente.

Ho fatto questo cambiamento sulla base di queste istruzioni http://akka.io/docs/akka/1.1.3/general/slf4j.html

Così, per sbarazzarsi dei suggerimenti in quella pagina e tornare al riferimento 1.1.3 akka.conf sono stato in grado di ottenere di guasto attori tolleranti.

1

Credo che il problema termina dopo che i messaggi sono inviati, non si sta cercando di mantenere in vita la vostra applicazione asincrona, e così le principali uscite filo e prende tutto giù con esso.

+0

Se aggiungo un Trhead.sleep (100000) alla fine di main() ottengo: '[info] Running FaultTest all'avvio FaultTest.main() Caricamento config [akka.conf] dal classpath dell'applicazione. Numero di attori: 5 Attore: 0 Ricevuto: 1 Attore: 4 Ricevuto: 3 Attore: 1 Ricevuto: 7 Attore: 1 Ricevuto: 9' e l'uscita fa una pausa ma i numeri aggiuntivi non vengono stampati. Non ho aspettato l'uscita dall'applicazione ma dopo 30-40 secondi non c'era niente. Inoltre, se rimuovo l'errore, i numeri vengono stampati molto rapidamente, in meno di 2 secondi. –