2015-05-25 4 views
5

Considerare il classico programma "Conteggio parole". Conta il numero di parole in tutti i file in qualche directory. Master riceve una directory e divide il lavoro tra gli attori Worker (ogni lavoratore lavora con un file). Si tratta di pseudo-codice:Modelli di pianificazione Akka

class WordCountWorker extends Actor { 

    def receive = { 
    case FileToCount(fileName:String) => 
     val count = countWords(fileName) 
     sender ! WordCount(fileName, count) 
    } 
} 

class WordCountMaster extends Actor { 
    def receive = { 
    case StartCounting(docRoot) => // sending each file to worker 
     val workers = createWorkers() 
     fileNames = scanFiles(docRoot) 
     sendToWorkers(fileNames, workers) 
    case WordCount(fileName, count) => // aggregating results 
     ... 

    } 
} 

Ma io voglio eseguire questo programma Word Count dal programma (ad esempio ogni 1 minuto), fornendo directory diverse per la scansione.

E Akka prevede bel modo per il messaggio di programmazione che passa:

system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName)) 

Ma il problema con sopra scheduler inizia quando scheduler invia un nuovo messaggio da barrare, ma il messaggio precedente non è ancora elaborato (ad esempio ho inviato un messaggio a scansione di una grande directory, e dopo 1 secondo ho inviato un altro messaggio per scansionare un'altra directory, quindi l'operazione di elaborazione della prima directory non è ancora completata). Di conseguenza, il mio WordCountMaster riceverà i messaggi WordCount da lavoratori che stanno elaborando diverse directory.

Come soluzione alternativa invece di pianificare l'invio di messaggi, posso pianificare l'esecuzione di un blocco di codice che creerà ogni volta nuovoWordCountMaster. Cioè una directory = uno WordCountMaster. Ma penso che sia inefficiente, e anch'io ho bisogno di fornire nomi univoci per WordCountMaster per evitare InvalidActorNameException.

Quindi la mia domanda è: dovrei creare nuovo WordCountMaster per ogni spunta come ho menzionato nel precedente paragrafo? O ci sono idee/schemi migliori su come ridisegnare questo programma per supportare la pianificazione?


qualche aggiornamento: In caso di creare un attore Maestro per directory, ho alcuni problemi:

  1. problema con denominazione attori

InvalidActorNameException: Nome attore [WordCountMaster ] non è unico!

e

InvalidActorNameException: Nome attore [WordCountWorker] non è unico!

Posso risolvere questo problema solo non fornendo il nome dell'attore. Ma in questo caso i miei attori ricevono nomi generati automaticamente, come $a, $b ecc. Non va bene per me.

  1. problema con config:

voglio escludere la configurazione dei miei router per application.conf. Cioè Voglio fornire la stessa configurazione per ogni router WordCountWorker.Ma dal momento che io non sto controllando i nomi di attori non posso usare la configurazione di seguito, perché non so i nomi degli attori:

/wordCountWorker{ 
    router = smallest-mailbox-pool 
    nr-of-instances = 5 
    dispatcher = word-counter-dispatcher 
    } 
+0

Puoi dire di più sul motivo per cui ricevere vecchi messaggi WordCount è un problema? – kybernetikos

+0

@kybernetikos Se invio istantaneamente ai miei messaggi 'WordCountMaster' 2 con diverse directory:' StartCounting (directory_1) 'e' StartCounting (directory_2) ', quindi' WordCountMaster' riceverà risultati per diverse directory. Cioè questo messaggio conterrà file provenienti da diverse directory 'WordCount (fileName, count)' – MyTitle

+0

sì, ma mi chiedo perché questo sia un problema. Non sei in grado di identificare la directory dal nome del file? – kybernetikos

risposta

4

Io non sono un esperto Akka, ma penso che l'approccio di avere un attore per aggregazione non è inefficiente È necessario mantenere le aggregazioni concorrenti separate in qualche modo. Puoi dare ad ogni aggregazione un id in modo da tenerli separati dall'ID nell'unico e solo attore principale, oppure puoi usare la denominazione degli attori Akka e la logica del ciclo di vita e delegare ogni aggregazione per ogni giro di conteggio ad un attore che vivrà solo per quella logica di aggregazione.

Per me l'utilizzo di un attore per aggregazione sembra essere più elegante.

Inoltre si ricorda che Akka ha un'implementazione per il modello di aggregazione come descritto here

+0

Grazie per la risposta, ho deciso di creare un attore principale per aggregazione come suggerito. Per favore, puoi spiegare in modo più approfondito come posso applicare Aggregation Pattern al programma WordCount? Ho letto questo articolo e non mi è chiaro come utilizzare il modello di aggregazione nel mio caso. Grazie – MyTitle

+0

Come detto, non sono un esperto, quindi segui il mio consiglio con cautela :) Con il modello di aggregazione è possibile all'inizio del master di aggregazione iterare sui file e creare i lavoratori. Ciascuno dei lavoratori invierà un messaggio 'WordCount' al master. Nel master puoi quindi avere qualcosa come 'val handle = expect {case WordCount => hereYouProcessTheDataFromOneWorker; se (allWorkersAnswered) unexpect (handle); caso TimeOut => handleTimeOut; unexpect (handle)}. Si prega di dare un'occhiata al campione nel link nella risposta. –

2

Si dovrebbe assumere become/unbecome funzionalità dei lavoratori. Se il tuo lavoratore inizia a scansionare una grande cartella usa become per cambiare il comportamento dell'attore che ignora un altro messaggio (o una risposta che non lo elabora), dopo che la scansione della directory invia il messaggio con il conteggio delle parole e unbecome al comportamento standard.

1

In primo luogo. al problema di denominazione: basta nominare i tuoi attori in modo dinamico e univoco, qualcosa del genere:
WorkerActor + "-" + nomefile ... o ... MasterActor + "-" + directoryName
O mi manca qualcosa?

In secondo luogo, perché la pianificazione? Non sarebbe più logico iniziare a lavorare sulla prossima directory quando il primo è finito? Se la pianificazione è un requisito, poi vedo molte soluzioni diverse per il tuo problema e cercherò di affrontare alcuni di loro:

1.
Tre gerarchia di livelli:
MasterActor -> DirectoryActor -> WorkerActor
Creare un nuovo attore di directory per ogni nuova directory e un nuovo worker per ogni file.

2.
Due gerarchia di livelli:
MasterActor -> WorkerActor
Si crea un nuovo lavoratore per ogni file.
due opzioni per identificare i risultati ricevuti:
a) distribuire lavoro ai lavoratori chiedendo e risultati aggregati tramite operazioni a termine
b) prevede una ID messaggio nel lavoro (ad esempio, il nome della directory)

3.
Gerarchia a due livelli con bilanciamento del carico:
Come l'opzione 2, ma non si crea un nuovo worker per ogni file, si ha un numero fisso di lavoratori con un dispatcher di bilanciamento o un router di cassette postali più piccolo.

4.
Una gerarchia a livello con i futures:
L'attore maestro non ha figli, lo fa di lavoro e di aggregati risultati con solo futures.

Raccomando anche di leggere sul modello di aggregazione Akka come suggerito da Gregor Raýman nella sua risposta.

2

Personalmente, non userei gli attori per risolvere questo problema di aggregazione, ma comunque, ecco qui.

Non penso che ci sia un modo ragionevole per gestire il conteggio delle parole per più directory contemporaneamente nel modo in cui lo suggerisci. Dovresti invece avere un attore "maestro maestro" che supervisiona i contatori. Quindi, hai tre classi di attori:

  • FileCounter: riceve un file da leggere e lo elabora. Al termine, invierà il risultato al mittente.
  • CounterSupervisor: questo tiene traccia di quale FileCounter ha completato i propri lavori e invierà il risultato a WordCountForker.
  • WordCountForker: questo attore tiene traccia di quale sottosistema ha terminato il proprio compito e, se sono tutti occupati, crea un nuovo CounterSupervisor per risolvere il problema.

Il contatore di file dovrebbe essere il più semplice da scrivere.

class FileCounter() extends Actor with ActorLogging { 

    import context.dispatcher 

    override def preStart = { 
     log.info("FileCounter Actor initialized") 
    } 

    def receive = { 
     case CountFile(file) => 
      log.info("Counting file: " + file.getAbsolutePath) 

      FileIO.readFile(file).foreach { data => 
       val words = data 
        .split("\n") 
        .map { _.split(" ").length } 
        .sum 

       context.parent ! FileCount(words) 
      } 
    } 
} 

E ora l'attore che controlla i contatori di file.

class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging { 

    var total = 0 
    var files: Array[File] = _ 
    var pendingActors = 0 

    override def preStart = { 
     for(i <- 1 to actorPool) 
      context.actorOf(FileCounter.props(), name = s"counter$i") 
    } 

    def receive = { 
     case CountDirectory(base) => 
      log.info("Now counting starting from directory : " + base.getAbsolutePath) 
      total = 0 
      files = FileIO.getAllFiles(base) 
      pendingActors = 0 
      for(i <- 1 to actorPool if(i < files.length)) { 
       pendingActors += 1 
       context.child(s"counter$i").get ! CountFile(files.head) 
       files = files.tail 
      } 

     case FileCount(count) => 
      total += count 
      pendingActors -= 1 
      if(files.length > 0) { 
       sender() ! CountFile(files.head) 
       files = files.tail 
       pendingActors += 1 
      } else if(pendingActors == 0) { 
       context.parent ! WordCountTotal(total) 
      } 
    } 
} 

E poi l'attore che supervisiona i supervisori.

class WordCountForker(counterActors: Int) extends Actor with ActorLogging { 

    var busyActors: List[(ActorRef, ActorRef)] = Nil 
    var idleActors: List[ActorRef] = _ 

    override def preStart = { 
     val first = context.actorOf(CounterSupervisor.props(counterActors)) 
     idleActors = List(first) 
     log.info(s"Initialized first supervisor with $counterActors file counters.") 
    } 

    def receive = { 
     case msg @ CountDirectory(dir) => 
      log.info("Count directory received") 
      val counter = idleActors match { 
       case Nil => 
        context.actorOf(CounterSupervisor.props(counterActors)) 
       case head :: rest => 
        idleActors = rest 
        head 
      } 
      counter ! msg 
      busyActors = (counter, sender()) :: busyActors 

     case msg @ WordCountTotal(n) => 
      val path = sender().path.toString() 
      val index = busyActors.indexWhere { _._1.path.toString == path } 
      val (counter, replyTo) = busyActors(index) 
      replyTo ! msg 
      idleActors = counter :: idleActors 
      busyActors = busyActors.patch(index, Nil, 1) 
    } 
} 

ho lasciato fuori alcune parti della risposta di tenerlo conciso, per quanto possibile, se volete vedere il resto del codice I posted a Gist.

Inoltre, per quanto riguarda le vostre preoccupazioni in merito all'efficienza, la soluzione qui eviterà che ci sia un sottosistema per directory, ma ne verranno generati ancora più di uno se ce n'è bisogno.