2013-10-20 2 views
5

Ho un esempio che sto costruendo utilizzando Play Framework 2.2.0-scala che utilizza WebSockets per trasmettere i dati a un client. Il problema che sto avendo è che, per qualsiasi motivo, uno dei figli di un attore principale non viene arrestato correttamente. Tutti i registri indicano che è in fase di arresto e che è stato arrestato, ma vedo che non è effettivamente inattivo pubblicando dati su di esso. Ecco po 'di codice, prima con la mia azione di controllo:Scala: l'attore Akka non sta morendo in Play Framework 2.2.0

def scores(teamIds: String) = WebSocket.async[JsValue] { request => 
    val teamIdsArr:Array[String] = teamIds.split(",").distinct.map { el => 
     s"nfl-streaming-scores-${el}" 
    } 

    val scoresStream = Akka.system.actorOf(Props(new ScoresStream(teamIdsArr))) 
    ScoresStream.join(scoresStream) 
    } 

Così, ogni volta che un client si connette, si uniscono ScoresStream che restituisce il rispettivo Iteratee, Enumeratore che WebSocket.async richiede. L'oggetto ScoresStream reale sia simile alla seguente:

object ScoresStream { 

    implicit val timeout = Timeout(5 seconds) 

    def join(scoresStream:ActorRef):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = { 

    (scoresStream ? BeginStreaming).map { 

     case Connected(enumerator) => 
     val iteratee = Iteratee.foreach[JsValue] { _ => 
      Logger.info("Ignore iteratee input.") 
     }.map { _ => 
      Logger.info("Client quitting - killing Actor.") 
      scoresStream ! UnsubscribeAll 
      scoresStream ! PoisonPill 
     } 
     (iteratee,enumerator) 
} 

L'idea è quella di uccidere l'attore principale, ScoresStream, quando il client si disconnette. Lo faccio usando scoresStream ! PoisonPill.

ScoresStream a sua volta crea Pub e Sub casi che sono involucri che si collegano a Redis per la pubblicazione/tracciatura ai messaggi, ecco il codice Attore:

class ScoresStream(teamIds: Array[String]) extends Actor with CreatePubSub with akka.actor.ActorLogging { 

    val (scoresEnumerator, scoresChannel) = Concurrent.broadcast[JsValue] 

    case class Message(kind: String, user: String, message: String) 
    implicit val messageReads = Json.reads[Message] 
    implicit val messageWrites = Json.writes[Message] 

    val sub = context.child("sub") match { 
    case None => createSub(scoresChannel) 
    case Some(c) => c 
    } 

    val pub = context.child("pub") match { 
    case None  => createPub(teamIds) 
    case Some(c) => c 
    } 

    def receive = { 
    case BeginStreaming => { 
     log.info("hitting join...") 
     sub ! RegisterCallback 
     sub ! SubscribeChannel(teamIds) 
     sender ! Connected(scoresEnumerator) 
    } 

    case UnsubscribeAll => { 
     sub ! UnsubscribeChannel(teamIds) 
    } 
    } 

} 

trait CreatePubSub { self:Actor => 
    def createSub(pChannel: Concurrent.Channel[JsValue]) = context.actorOf(Props(new Sub(pChannel)), "sub") 
    def createPub(teamIds: Array[String]) = context.actorOf(Props(new Pub(teamIds)), "pub") 
} 

Infine, ecco il codice vero e secondaria Attore: (Pub doesn 't sembra rilevante qui come lo spegnimento fine):

class Sub(pChannel: Concurrent.Channel[JsValue]) extends Actor with CreatePublisherSubscriber with ActorLogging { 
    val s = context.child("subscriber") match { 
    case None => createSubscriber 
    case Some(c) => c 
    } 

    def callback(pubsub: PubSubMessage) = pubsub match { 
    case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup") 
    case S(channel, no) => println("subscribed to " + channel + " and count = " + no) 
    case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) 
    case M(channel, msg) => 
     msg match { 
     // exit will unsubscribe from all channels and stop subscription service 
     case "exit" => 
      println("unsubscribe all ..") 
      pChannel.end 
      r.unsubscribe 

     // message "+x" will subscribe to channel x 
     case x if x startsWith "+" => 
      val s: Seq[Char] = x 
      s match { 
      case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } 
      } 

     // message "-x" will unsubscribe from channel x 
     case x if x startsWith "-" => 
      val s: Seq[Char] = x 
      s match { 
      case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) 
             pChannel.end 
      } 

     case x => 
     try { 
      log.info("Just got a message: " + x) 
      pChannel.push(Json.parse(x)) 
      } 
      catch { 
      case ex: com.fasterxml.jackson.core.JsonParseException => { 
       log.info("Malformed JSON sent.") 
      } 
      } 
     } 
    } 

    def receive = { 
    case RegisterCallback => { 
     log.info("Creating a subscriber and registering callback") 
     s ! Register(callback) 
    } 
    case SubscribeChannel(teamIds) => { 
     teamIds.foreach { x => log.info("subscribing to channel " + x + " ") } 
     //sub ! Subscribe(Array("scores-5","scores-6")) 
     s ! Subscribe(teamIds) 
    } 
    case UnsubscribeChannel(teamIds) => { 
     teamIds.foreach { x => log.info("unsubscribing from channel " + x + " ") } 
     s ! Unsubscribe(teamIds) 
    } 
    case true => println("Subscriber successfully received message.") 
    case false => println("Something went wrong.") 
    } 
} 

trait CreatePublisherSubscriber { self:Actor => 
    def r = new RedisClient("localhost", 6379) 
    def createSubscriber = context.actorOf(Props(new Subscriber(r)), "subscriber") 
    def createPublisher = context.actorOf(Props(new Publisher(r)), "publisher") 
} 

Ora, quando un client si connette, i messaggi di avvio aspetto sano:

012.351.
[DEBUG] [10/20/2013 00:35:53.618] [application-akka.actor.default-dispatcher-12] [akka://application/user] now supervising Actor[akka://application/user/$c#-54456921] 
[DEBUG] [10/20/2013 00:35:53.619] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.620] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/sub#1376180991] 
[DEBUG] [10/20/2013 00:35:53.621] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/pub/publisher] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.622] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] started ([email protected]) 
Subscriber successfully received message. 
Subscriber successfully received message. 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] now supervising Actor[akka://application/user/$c/sub/subscriber#-1562348862] 
subscribed to nfl-streaming-scores-5 and count = 1 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/pub#-707418539] 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] hitting join... 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] Creating a subscriber and registering callback 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] subscribing to channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.703] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] now supervising Actor[akka://application/user/$c/pub/publisher#1509054514] 

e scollegamento sembra in buona salute:

[info] application - Client quitting - killing Actor. 
unsubscribed from nfl-streaming-scores-5 and count = 0 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] received AutoReceiveMessage Envelope(PoisonPill,Actor[akka://application/deadLetters]) 
[INFO] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] unsubscribing from channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopping 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] stopping 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/pub/publisher] stopped 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] stopped 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] stopped 
[INFO] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] Message [java.lang.Boolean] from Actor[akka://application/user/$c/sub/subscriber#-1562348862] to Actor[akka://application/user/$c/sub#1376180991] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopping 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopped 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopped 

Ed ecco il problema, dopo che il client è disconnesso, ho intenzione di inviare un messaggio che il momento di arresto attore è stato sottoscritto da:

redis-cli publish "nfl-streaming-scores-5" "{\"test\":\"message\"}" 

e qui si sta mostrando, quando non dovrebbe essere, questo attore dovrebbe tecnicamente essere morto. Altri attori che erano prima di ricevere il messaggio anche, quelli etichettati con $ a/$ b. Posso confermare che nessun altro client è connesso.

[INFO] [10/20/2013 00:38:33.097] [Thread-7] [akka://application/user/$c/sub] Just got a message: {"test":"message"} 

Che cosa è anche un indicatore strana è che i nomi di indirizzi non arrivano mai riutilizzati. Continuo a vedere una tendenza come la sequela di nomi di deporre le uova quando ho scollegare/collegare:

akka://application/user/$c 
akka://application/user/$d 
akka://application/user/$e 

non vedono mai i vecchi riferimenti vengono riutilizzati.

La mia ipotesi qui è che la connessione a Redis non viene chiusa in modo pulito. Non spiega perché i registri dicono che l'attore si è fermato, ma esistono ancora connessioni stabilite per i redis dopo aver eseguito netstat anche dopo che tutti gli attori sono presumibilmente morti. Quando interrompo completamente l'esecuzione dell'applicazione, le connessioni si cancellano. È come se l'annullamento dell'iscrizione stia silenziosamente fallendo e questo è mantenere in vita l'attore e anche la connessione, il che è davvero negativo per molteplici ragioni, perché alla fine il sistema esaurirà i descrittori di file e/o avrà perdite di memoria. C'è qualcosa di ovvio qui che sto sbagliando?

+1

'def r = new RedisClient', penso che val lazy sarebbe meglio, quindi si crea solo una istanza di RedisClient invece di creare ogni volta una nuova istanza quando si chiama 'r.doSomeThing'. – Schleichardt

+0

L'ho provato. L'ho persino spostato fuori dal tratto e direttamente nell'attore. Tuttavia, il risultato finale è lo stesso. usando 'redis-cli', vedo entrare l'UNSUBSCRIBE. La connessione è ancora stabilita, tuttavia. Il modo in cui sto creando/abbattendo gli attori qui penso sia fastidioso. – randombits

+0

I nomi degli attori creati automaticamente non saranno mai riutilizzati, il che è il modo più sicuro per renderli unici. Quello che sospetto è che l'attore dell'abbonato (che non mostri) passa il callback alla biblioteca del client redis dove viene eseguito quando arrivano i messaggi, indipendentemente dall'attore. –

risposta

3

Solo perché si sta fermando l'attore non significa che tutte le risorse di proprietà di quell'attore vengano automaticamente ripulite.Se c'è un RedisClient legato a quell'istanza di attore e questa connessione deve essere arrestata per essere ripulita correttamente, allora dovresti fare qualcosa del genere nel metodo postStop. Sono d'accordo anche con @Schleichardt nel fatto che dovresti cambiare il tuo def r = new RedisClient in val o in un valore lazy val (a seconda dell'ordine di inizializzazione e delle esigenze). In questo modo sai che per istanza di sottoscrittore, c'è solo un singolo RedisClient da ripulire. Non conosco l'API per lo RedisClient che si sta utilizzando, ma supponiamo che abbia un metodo shutdown che interromperà la sua connessione e pulirà le sue risorse. Poi si può aggiungere un semplice postStop all'attore abbonato in questo modo:

override def postStop { 
    r.shutdown 
} 

Supponendo che si effettua il DEF al cambiamento val, questo potrebbe essere quello che stai cercando.