2016-04-13 10 views
10

Durante la ricezione di eventi, Akka Actors elaborerà un messaggio alla volta, bloccando finché una richiesta non è stata completata prima di passare al messaggio successivo.Attendere la chiamata futura asincrona prima dell'elaborazione del messaggio successivo in Akka

Questo funziona bene per le attività sincrone/di blocco, tuttavia se si desidera eseguire una richiesta asincrona/non bloccante, Akka continuerà l'elaborazione senza attendere il completamento dell'attività.

Ad esempio:

def doThing():Future[Unit] = /* Non blocking request here */ 

def receive = { 
    case DoThing => doThing() pipeTo sender 
} 

Questo chiamerà doThing() e avviare l'elaborazione del futuro, ma non aspetterà che sia completato prima di elaborare il messaggio successivo - sarà semplice eseguire i seguenti messaggi nella coda più velocemente possibile.

In sostanza, sembra che Akka consideri che "restituire un futuro" sia "finito di elaborare" e passare al messaggio successivo.

Al fine di elaborare un messaggio alla volta, sembra che ho bisogno di bloccare attivamente il filo attore per fermarlo farlo

def receive = { 
    case DoThing => sender ! blocking(Await.result(doThing())) 
} 

Questo si sente come un approccio molto sbagliato - è artificialmente bloccando un thread nel codice che dovrebbe altrimenti essere completamente non-bloccante.

Quando si confronta Akka con, per esempio, con gli attori di elisir, possiamo facilmente evitare questo problema in primo luogo utilizzando una coda per richiedere il messaggio successivo senza dover bloccare artificialmente.

C'è un modo in Akka a uno

a) Attendere un Future per completare prima di elaborare il messaggio successivo senza bloccare il thread.

b) Utilizzare una chiamata coda esplicita o qualche altro meccanismo per utilizzare un flusso di lavoro basato su pull anziché basato su push?

+1

Forse un modello di tiratura del lavoro sarebbe adatto alle tue esigenze: quando il Futuro è completo, puoi farla accodare un nuovo messaggio per elaborare il risultato. – childofsoong

+0

L'attore deve bloccare o gestire i messaggi in arrivo in qualche modo, per tenere il passo con le richieste. Se l'interruzione dell'elaborazione è la cosa più naturale da fare, è possibile utilizzare comportamenti diversi in base allo stato di elaborazione e nascondere i messaggi per la durata dell'azione asincrona, vedere http://doc.akka.io/docs/akka/snapshot /scala/actors.html#Become_Unbecome e http://doc.akka.io/docs/akka/snapshot/scala/actors.html#Stash –

risposta

6

Come è stato suggerito nei commenti, è possibile utilizzare la caratteristica Stash (http://doc.akka.io/docs/akka/current/scala/actors.html#Stash) per archiviare i messaggi in arrivo mentre si attende la risoluzione Future.

È necessario salvare il mittente corrente in modo da non chiudere in modo non corretto il riferimento dell'attore mittente. È possibile ottenere ciò attraverso una semplice case-case come quella definita di seguito.

class MyActor extends Actor with Stash { 

    import context.dispatcher 

    // Save the correct sender ref by using something like 
    // case class WrappedFuture(senderRef: ActorRef, result: Any) 
    def doThing(): Future[WrappedFuture] = ??? 

    override def receive: Receive = { 
    case msg: DoThing => 
     doThing() pipeTo self 

     context.become({ 
     case WrappedFuture(senderRef, result) => 
      senderRef ! result 
      unstashAll() 
      context.unbecome() 
     case newMsg: DoThing => 
      stash() 
     }, discardOld = false) 
    } 
} 
+0

Il problema con questo approccio è che mentre si sta aspettando il/i futuro/i per tornare, stai effettivamente bloccando tutti i messaggi in arrivo, anche se ciò non sarebbe necessario (quasi come una chiamata IO bloccante). Quindi devi fare molta attenzione a non renderlo un collo di bottiglia nella tua applicazione. – egbokul

0

Invece di avere un attore che fare con questo problema, hanno una catena di due:

  • Attore 1 riceve i messaggi iniziali, inizia tutto chiama il IO e li fonde in una delle prossime
  • L'attore 2 riceve i risultati dei Futures uniti.

Questo non garantisce la conservazione dell'ordine dei messaggi, quindi se necessario, Actor 2 deve essere a conoscenza dei messaggi che Actor 1 ha visto e potenzialmente memorizzare i primi messaggi su se stesso.

Non sono a conoscenza di nulla in Akka che risolva questo problema. Forse c'è una libreria che implementa uno schema come questo?