2012-08-30 8 views
5

Devo pubblicare messaggi di tipi diversi per il flusso di eventi, e quei messaggi dovrebbero avere priorità diverse ad esempio, se 10 messaggi di tipo A sono stati pubblicati, e dopotutto un messaggio di tipo B viene postato, e la priorità di B di è superiore alla priorità di A - il messaggio B deve essere prelevato dallo dal prossimo attore anche se ci sono 10 messaggi di tipo A in coda.Akka :: Utilizzo di messaggi con priorità diverse sul flusso di eventi in ActorSystem

Ho letto sui messaggi prioritari here e ha creato il mio semplice attuazione di tale casella di posta:

class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

    PriorityGenerator { 
     case ServerPermanentlyDead => println("Priority:0"); 0 
     case ServerDead => println("Priority:1"); 1 
     case _ => println("Default priority"); 10 
    } 

) 

poi ho configurato in application.conf

akka { 

    actor { 

     prio-dispatcher { 
      type = "Dispatcher" 
      mailbox-type = "mailbox.PrioritizedMailbox" 
     } 

    } 

} 

e cablato nel mio attore:

private val myActor = actors.actorOf(
    Props[MyEventHandler[T]]. 
    withRouter(RoundRobinRouter(HIVE)). 
    withDispatcher("akka.actor.prio-dispatcher"). 
    withCreator(
    new Creator[Actor] { 
     def create() = new MyEventHandler(storage) 
    }), name = "eventHandler") 

Sto utilizzando ActorSystem.eventStream.publish in per inviare messaggi, e il mio attore è iscritto ad esso (posso vedere nei registri che i messaggi vengono elaborati, ma nell'ordine FIFO ).

Tuttavia sembra che non sia sufficiente, perché nei registri/console non ho mai visto i messaggi come "Priorità predefinita". Mi sto perdendo qualcosa qui? L'approccio descritto funziona con flussi di eventi o solo con invocazioni dirette di inviando un messaggio sull'attore? E come faccio a ricevere messaggi con priorità con evento eventStream?

risposta

10

Il tuo problema è che i tuoi attori sono follemente veloci in modo che i messaggi vengano elaborati prima che abbiano il tempo di accodarsi, quindi non ci può essere alcuna priorizzazione fatta dalla casella di posta. L'esempio che segue dimostra il punto:

trait Foo 
    case object X extends Foo 
    case object Y extends Foo 
    case object Z extends Foo 

    class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config) 
extends UnboundedPriorityMailbox( 
    PriorityGenerator { 
     case X ⇒ 0 
     case Y ⇒ 1 
     case Z ⇒ 2 
     case _ ⇒ 10 
    }) 

val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString( 
     """ prio-dispatcher { 
     type = "Dispatcher" 
      mailbox-type = "%s" 
     }""".format(classOf[PrioritizedMailbox].getName))) 
     val latch = new java.util.concurrent.CountDownLatch(1) 
     val a = s.actorOf(Props(new akka.actor.Actor { 
     latch.await // Just wait here so that the messages are queued up 
inside the mailbox 
     def receive = { 
      case any ⇒ /*println("Processing: " + any);*/ sender ! any 
     } 
     }).withDispatcher("prio-dispatcher")) 
     implicit val sender = testActor 
     a ! "pig" 
     a ! Y 
     a ! Z 
     a ! Y 
     a ! X 
     a ! Z 
     a ! X 
     a ! "dog" 

     latch.countDown() 

     Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) } 
     s.shutdown() 

Questo test viene superato a pieni voti