2015-06-11 13 views
8

Ecco lo schema che ho incontrato:Akka: modello per combinare i messaggi provenienti da più bambini

Un attore A ha più figli C1, ..., Cn. Alla ricezione di un messaggio, lo A lo invia a ciascuno dei suoi figli, che eseguono ciascuno dei calcoli sul messaggio e, al termine, lo rinviano a A. A vorrebbe quindi combinare i risultati di tutti i bambini per passare a un altro attore.

Come sarebbe una soluzione per questo problema? O si tratta di un anti-modello? In quale caso come dovrebbe essere affrontato questo problema?

Ecco un esempio banale che, si spera, illustra la mia soluzione attuale. Le mie preoccupazioni sono che è un codice duplicato (fino alla simmetria); non si estende molto bene a "molti" bambini; e rende piuttosto difficile vedere cosa sta succedendo.

import akka.actor.{Props, Actor} 

case class Tagged[T](value: T, id: Int) 

class A extends Actor { 
    import C1._ 
    import C2._ 

    val c1 = context.actorOf(Props[C1], "C1") 
    val c2 = context.actorOf(Props[C2], "C2") 
    var uid = 0 
    var c1Results = Map[Int, Int]() 
    var c2Results = Map[Int, Int]() 

    def receive = { 
    case n: Int => { 
     c1 ! Tagged(n, uid) 
     c2 ! Tagged(n, uid) 
     uid += 1 
    } 
    case Tagged(C1Result(n), id) => c2Results get id match { 
     case None => c1Results += (id -> n) 
     case Some(m) => { 
     c2Results -= id 
     context.parent ! (n, m) 
     } 
    } 
    case Tagged(C2Result(n), id) => c1Results get id match { 
     case None => c2Results += (id -> n) 
     case Some(m) => { 
     c1Results -= id 
     context.parent ! (m, n) 
     } 
    } 
    } 
} 

class C1 extends Actor { 
    import C1._ 

    def receive = { 
    case Tagged(n: Int, id) => Tagged(C1Result(n), id) 
    } 
} 

object C1 { 
    case class C1Result(n: Int) 
} 

class C2 extends Actor { 
    import C2._ 

    def receive = { 
    case Tagged(n: Int, id) => Tagged(C2Result(n), id) 
    } 
}  

object C2 { 
    case class C2Result(n: Int) 
} 

Se pensate che il codice è dio terribile, take it easy su di me, ho appena iniziato Akka di apprendimento;)

+0

Dai un'occhiata alla soluzione per http://stackoverflow.com/questions/26319471/akka-design-principals/26328702#26328702. Praticamente lo stesso problema. –

+0

Hmm mi scuso, ho pensato che qualcosa del genere sarebbe già stato chiesto, ma non ho trovato la domanda. – Mullefa

risposta

7

Nel caso di molti - o di un numero variabile di - attori bambini, il ask pattern suggerito da Zim-Zam sarà ottenere rapidamente di mano.

Il aggregator pattern è progettato per aiutare con questo tipo di situazione. Fornisce un tratto Aggregator che è possibile utilizzare in un attore per eseguire la logica di aggregazione.

Un attore client che desidera eseguire un'aggregazione può avviare un'istanza di attore basata su Aggregator e inviarlo un messaggio che darà il via al processo di aggregazione.

È necessario creare un nuovo aggregatore per ciascuna operazione di aggregazione e terminare con l'invio del risultato (quando ha ricevuto tutte le risposte o un timeout).

Un esempio di questo modello per sommare i valori interi detenuti dagli attori rappresentati dalla classe Bambino sono elencati di seguito. (Si noti che non v'è alcuna necessità per loro di essere tutti i bambini sono controllati dallo stesso attore genitore:. Il SummationAggregator solo bisogno di un insieme di ActorRefs)

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.duration._ 

import akka.actor._ 
import akka.contrib.pattern.Aggregator 

object Child { 
    def props(value: Int): Props = Props(new Child(value)) 

    case object GetValue 
    case class GetValueResult(value: Int) 
} 

class Child(value: Int) extends Actor { 
    import Child._ 

    def receive = { case GetValue => sender ! GetValueResult(value) } 
} 

object SummationAggregator { 
    def props = Props(new SummationAggregator) 

    case object TimedOut 
    case class StartAggregation(targets: Seq[ActorRef]) 
    case object BadCommand 
    case class AggregationResult(sum: Int) 
} 

class SummationAggregator extends Actor with Aggregator { 
    import Child._ 
    import SummationAggregator._ 

    expectOnce { 
    case StartAggregation(targets) => 
     // Could do what this handler does in line but handing off to a 
     // separate class encapsulates the state a little more cleanly 
     new Handler(targets, sender()) 
    case _ => 
     sender ! BadCommand 
     context stop self 
    } 

    class Handler(targets: Seq[ActorRef], originalSender: ActorRef) { 
    // Could just store a running total and keep track of the number of responses 
    // that we are awaiting... 
    var valueResults = Set.empty[GetValueResult] 

    context.system.scheduler.scheduleOnce(1.second, self, TimedOut) 

    expect { 
     case TimedOut => 
     // It might make sense to respond with what we have so far if some responses are still awaited... 
     respondIfDone(respondAnyway = true) 
    } 

    if (targets.isEmpty) 
     respondIfDone() 
    else 
     targets.foreach { t => 
     t ! GetValue 
     expectOnce { 
      case vr: GetValueResult => 
      valueResults += vr 
      respondIfDone() 
     } 
     } 

    def respondIfDone(respondAnyway: Boolean = false) = { 
     if (respondAnyway || valueResults.size == targets.size) { 
     originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc, GetValueResult(v)) => acc + v }) 
     context stop self 
     } 
    } 
    } 
} 

Per utilizzare questo SummationAggregator dal tuo attore genitore che si possa fare:

context.actorOf(SummationAggregator.props) ! StartAggregation(children) 

e quindi gestire AggregationResult da qualche parte nella ricezione del genitore.

+0

Ciao per la tua risposta Jeremy. Se ho capito bene, sembra che uno dei vantaggi di questo approccio sia la creazione di un attore per raccogliere tutti i risultati, e non un attore per attore secondario (come nel modello di domanda). Tuttavia, guardando il link che hai postato, viene creata un'istanza anonima di ogni attore secondario per richiesta che (con la mia comprensione cruda) sembra minare alcuni dei benefici. Invece, potrebbe essere creata una singola istanza di ogni attore secondario nella classe di inclusione da utilizzare da ciascun aggregatore creato? Se "si" per favore eccetto la mia modifica che lo dimostra. – Mullefa

+1

Penso che l'esempio nel collegamento a cui si fa riferimento stia utilizzando gli attori temporanei (uno per ciascun tipo di account) anziché gli attori esistenti (ad esempio gli attori figli esistenti). Ho sostituito il tuo esempio con un semplice caso che riassume i valori detenuti in qualsiasi numero di attori (anche se non devono essere bambini). Spero che questo lo renda più chiaro. –

+0

Questo è eccellente! Grazie per aver dedicato del tempo a dare questo esempio. – Mullefa

5

È possibile utilizzare ? invece di ! sugli attori bambini - questa volontà fare in modo che gli attori secondari restituiscano un Future con i loro (eventuali) risultati, ovvero tutto è ancora non bloccante fino a quando non si ottiene Await il risultato dello Future. L'attore principale può quindi comporre questi Futures e inviarlo a un altro attore - conoscerà già ogni identità Future's e quindi non sarà necessario preoccuparsi di codificare ciascun messaggio in modo da poterlo riordinare in un secondo momento. Ecco un semplice esempio in cui ogni bambino restituisce un valore casuale Double e si desidera dividere il valore restituito del primo figlio per il valore restituito del secondo figlio (vale a dire l'ordine è importante).

import scala.concurrent.duration._ 

import akka.actor.{Props, Actor} 
import akka.pattern.{ask, pipe} 
import akka.util.Timeout 

class A extends Actor { 
    val c1 = context.actorOf(Props[C], "C1") 
    val c2 = context.actorOf(Props[C], "C2") 

    // The ask operation involves creating an internal actor for handling 
    // this reply, which needs to have a timeout after which it is 
    // destroyed in order not to leak resources; see more below. 
    implicit val timeout = Timeout(5 seconds) 

    def receive = { 
    case _ => { 
     val f1 = c1 ? "anything" // Future[Any] 
     val f2 = c2 ? "anything" // Future[Any] 
     val result: Future[Double] = for { 
     d1 <- f1.mapTo[Double] 
     d2 <- f2.mapTo[Double] 
     } yield d1/d2 
    } 
} 

class C extends Actor { 
    def receive = { 
    case _ => // random Double 
    } 
} 
+0

Grazie mille per la risposta. Ho approfondito un po 'il codice dopo aver letto la sezione _Ask: Send-And-Receive-Future_ di [akka doc] (http://doc.akka.io/docs/akka/snapshot/scala/actors.html) – Mullefa