Ho un esempio che sto costruendo utilizzando Play Framework 2.2.0-scala che utilizza WebSockets per trasmettere i dati a un client. Il problema che sto avendo è che, per qualsiasi motivo, uno dei figli di un attore principale non viene arrestato correttamente. Tutti i registri indicano che è in fase di arresto e che è stato arrestato, ma vedo che non è effettivamente inattivo pubblicando dati su di esso. Ecco po 'di codice, prima con la mia azione di controllo:Scala: l'attore Akka non sta morendo in Play Framework 2.2.0
def scores(teamIds: String) = WebSocket.async[JsValue] { request =>
val teamIdsArr:Array[String] = teamIds.split(",").distinct.map { el =>
s"nfl-streaming-scores-${el}"
}
val scoresStream = Akka.system.actorOf(Props(new ScoresStream(teamIdsArr)))
ScoresStream.join(scoresStream)
}
Così, ogni volta che un client si connette, si uniscono ScoresStream
che restituisce il rispettivo Iteratee, Enumeratore che WebSocket.async richiede. L'oggetto ScoresStream reale sia simile alla seguente:
object ScoresStream {
implicit val timeout = Timeout(5 seconds)
def join(scoresStream:ActorRef):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = {
(scoresStream ? BeginStreaming).map {
case Connected(enumerator) =>
val iteratee = Iteratee.foreach[JsValue] { _ =>
Logger.info("Ignore iteratee input.")
}.map { _ =>
Logger.info("Client quitting - killing Actor.")
scoresStream ! UnsubscribeAll
scoresStream ! PoisonPill
}
(iteratee,enumerator)
}
L'idea è quella di uccidere l'attore principale, ScoresStream
, quando il client si disconnette. Lo faccio usando scoresStream ! PoisonPill
.
ScoresStream
a sua volta crea Pub
e Sub
casi che sono involucri che si collegano a Redis per la pubblicazione/tracciatura ai messaggi, ecco il codice Attore:
class ScoresStream(teamIds: Array[String]) extends Actor with CreatePubSub with akka.actor.ActorLogging {
val (scoresEnumerator, scoresChannel) = Concurrent.broadcast[JsValue]
case class Message(kind: String, user: String, message: String)
implicit val messageReads = Json.reads[Message]
implicit val messageWrites = Json.writes[Message]
val sub = context.child("sub") match {
case None => createSub(scoresChannel)
case Some(c) => c
}
val pub = context.child("pub") match {
case None => createPub(teamIds)
case Some(c) => c
}
def receive = {
case BeginStreaming => {
log.info("hitting join...")
sub ! RegisterCallback
sub ! SubscribeChannel(teamIds)
sender ! Connected(scoresEnumerator)
}
case UnsubscribeAll => {
sub ! UnsubscribeChannel(teamIds)
}
}
}
trait CreatePubSub { self:Actor =>
def createSub(pChannel: Concurrent.Channel[JsValue]) = context.actorOf(Props(new Sub(pChannel)), "sub")
def createPub(teamIds: Array[String]) = context.actorOf(Props(new Pub(teamIds)), "pub")
}
Infine, ecco il codice vero e secondaria Attore: (Pub
doesn 't sembra rilevante qui come lo spegnimento fine):
class Sub(pChannel: Concurrent.Channel[JsValue]) extends Actor with CreatePublisherSubscriber with ActorLogging {
val s = context.child("subscriber") match {
case None => createSubscriber
case Some(c) => c
}
def callback(pubsub: PubSubMessage) = pubsub match {
case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup")
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
println("unsubscribe all ..")
pChannel.end
r.unsubscribe
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
pChannel.end
}
case x =>
try {
log.info("Just got a message: " + x)
pChannel.push(Json.parse(x))
}
catch {
case ex: com.fasterxml.jackson.core.JsonParseException => {
log.info("Malformed JSON sent.")
}
}
}
}
def receive = {
case RegisterCallback => {
log.info("Creating a subscriber and registering callback")
s ! Register(callback)
}
case SubscribeChannel(teamIds) => {
teamIds.foreach { x => log.info("subscribing to channel " + x + " ") }
//sub ! Subscribe(Array("scores-5","scores-6"))
s ! Subscribe(teamIds)
}
case UnsubscribeChannel(teamIds) => {
teamIds.foreach { x => log.info("unsubscribing from channel " + x + " ") }
s ! Unsubscribe(teamIds)
}
case true => println("Subscriber successfully received message.")
case false => println("Something went wrong.")
}
}
trait CreatePublisherSubscriber { self:Actor =>
def r = new RedisClient("localhost", 6379)
def createSubscriber = context.actorOf(Props(new Subscriber(r)), "subscriber")
def createPublisher = context.actorOf(Props(new Publisher(r)), "publisher")
}
Ora, quando un client si connette, i messaggi di avvio aspetto sano:
012.351.[DEBUG] [10/20/2013 00:35:53.618] [application-akka.actor.default-dispatcher-12] [akka://application/user] now supervising Actor[akka://application/user/$c#-54456921]
[DEBUG] [10/20/2013 00:35:53.619] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] started ([email protected])
[DEBUG] [10/20/2013 00:35:53.620] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/sub#1376180991]
[DEBUG] [10/20/2013 00:35:53.621] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/pub/publisher] started ([email protected])
[DEBUG] [10/20/2013 00:35:53.622] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] started ([email protected])
Subscriber successfully received message.
Subscriber successfully received message.
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] started ([email protected])
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] now supervising Actor[akka://application/user/$c/sub/subscriber#-1562348862]
subscribed to nfl-streaming-scores-5 and count = 1
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/pub#-707418539]
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] hitting join...
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] Creating a subscriber and registering callback
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] subscribing to channel nfl-streaming-scores-5
[DEBUG] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] started ([email protected])
[DEBUG] [10/20/2013 00:35:53.703] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] now supervising Actor[akka://application/user/$c/pub/publisher#1509054514]
e scollegamento sembra in buona salute:
[info] application - Client quitting - killing Actor.
unsubscribed from nfl-streaming-scores-5 and count = 0
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] received AutoReceiveMessage Envelope(PoisonPill,Actor[akka://application/deadLetters])
[INFO] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] unsubscribing from channel nfl-streaming-scores-5
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopping
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] stopping
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/pub/publisher] stopped
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] stopped
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] stopped
[INFO] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] Message [java.lang.Boolean] from Actor[akka://application/user/$c/sub/subscriber#-1562348862] to Actor[akka://application/user/$c/sub#1376180991] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopping
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopped
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopped
Ed ecco il problema, dopo che il client è disconnesso, ho intenzione di inviare un messaggio che il momento di arresto attore è stato sottoscritto da:
redis-cli publish "nfl-streaming-scores-5" "{\"test\":\"message\"}"
e qui si sta mostrando, quando non dovrebbe essere, questo attore dovrebbe tecnicamente essere morto. Altri attori che erano prima di ricevere il messaggio anche, quelli etichettati con $ a/$ b. Posso confermare che nessun altro client è connesso.
[INFO] [10/20/2013 00:38:33.097] [Thread-7] [akka://application/user/$c/sub] Just got a message: {"test":"message"}
Che cosa è anche un indicatore strana è che i nomi di indirizzi non arrivano mai riutilizzati. Continuo a vedere una tendenza come la sequela di nomi di deporre le uova quando ho scollegare/collegare:
akka://application/user/$c
akka://application/user/$d
akka://application/user/$e
non vedono mai i vecchi riferimenti vengono riutilizzati.
La mia ipotesi qui è che la connessione a Redis non viene chiusa in modo pulito. Non spiega perché i registri dicono che l'attore si è fermato, ma esistono ancora connessioni stabilite per i redis dopo aver eseguito netstat
anche dopo che tutti gli attori sono presumibilmente morti. Quando interrompo completamente l'esecuzione dell'applicazione, le connessioni si cancellano. È come se l'annullamento dell'iscrizione stia silenziosamente fallendo e questo è mantenere in vita l'attore e anche la connessione, il che è davvero negativo per molteplici ragioni, perché alla fine il sistema esaurirà i descrittori di file e/o avrà perdite di memoria. C'è qualcosa di ovvio qui che sto sbagliando?
'def r = new RedisClient', penso che val lazy sarebbe meglio, quindi si crea solo una istanza di RedisClient invece di creare ogni volta una nuova istanza quando si chiama 'r.doSomeThing'. – Schleichardt
L'ho provato. L'ho persino spostato fuori dal tratto e direttamente nell'attore. Tuttavia, il risultato finale è lo stesso. usando 'redis-cli', vedo entrare l'UNSUBSCRIBE. La connessione è ancora stabilita, tuttavia. Il modo in cui sto creando/abbattendo gli attori qui penso sia fastidioso. – randombits
I nomi degli attori creati automaticamente non saranno mai riutilizzati, il che è il modo più sicuro per renderli unici. Quello che sospetto è che l'attore dell'abbonato (che non mostri) passa il callback alla biblioteca del client redis dove viene eseguito quando arrivano i messaggi, indipendentemente dall'attore. –