2016-03-25 10 views
5

Come descritto in akka streams documentation ho cercato di creare una riserva di lavoratori (flussi):Pool di lavoratori con Akka Streams

def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = { 
    import GraphDSL.Implicits._ 

    Flow.fromGraph(GraphDSL.create() { implicit b => 
     val balancer = b.add(Balance[In](workerCount)) 
     val merge = b.add(Merge[Out](workerCount)) 

     for (_ <- 1 to workerCount) { 
     balancer ~> worker ~> merge 
     } 
     FlowShape(balancer.in, merge.out) 
    }) 
    } 

poi usato questa funzione per eseguire un flusso in parallelo:

def main(args: Array[String]) { 
    val system = ActorSystem() 
    implicit val mat = ActorMaterializer.create(system) 

    val flow = Flow[Int].map(e => { 
     println(e) 
     Thread.sleep(1000) // 1 second 
     e 
    }) 

    Source(Range.apply(1, 10).toList) 
     .via(balancer(flow, 3)) 
     .runForeach(e => {}) 
    } 

Ottengo l'output previsto 1, 2, 3, 4, 5, 6, 7, 8, 9 ma i numeri appaiono ad una velocità 1 al secondo (nessun parallelismo). Cosa sto facendo di sbagliato?

+0

che dire di contesto di esecuzione? Se si utilizza un pool di thread di dimensioni fisse con uno normale, è –

+0

Ciò significa che la dimensione del contesto predefinito è 1? Potresti per favore precisare qual è il modo preferito per configurare il contesto di esecuzione? – Mihai238

+0

Nessun contesto predefinito non è fisso, probabilmente stai importando il contesto implicito globale, che dipenderà da troppe cose come la versione, puoi provare 'implicit val ec = ExecutionContext.fromExecutor (Executors.newFixedThreadPool (10))' –

risposta

1

Come sottolineato da Endre Varga, il flusso stesso deve essere contrassegnato con .async.

Ma anche in questo caso, il comportamento non è deterministico perché gli stadi asincroni hanno una dimensione predefinita del buffer di 16 e il bilanciatore può inviare tutti i messaggi allo stesso operatore.

Di conseguenza, balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge comporterebbe il comportamento desiderato.

Per una risposta data da un progetto di vedere: https://github.com/akka/akka/issues/20146#issuecomment-201381356

3

I documenti in quella sezione non sono aggiornati e verranno risolti nella prossima versione. Fondamentalmente tutto ciò che serve è chiamare .async sul flusso stesso. In questo modo, si disegna una "scatola" attorno al flusso (che si può immaginare come una scatola con una porta di input e output) che impedirà la fusione attraverso quella casella. In questo modo, essenzialmente, tutti i lavoratori saranno su attori dedicati. Il resto del grafico (gli stadi di trasmissione e fusione) condividerà un altro attore (non verranno eseguiti su attori separati, la casella asincrona protegge solo il flusso, le cose all'esterno saranno comunque fuse).

+0

Credo che sia così che dovrebbe essere, ma credo anche che non è così. 'for (i <- 1 to workerCount) {bilanciamento ~> worker.async ~> unione}' sembra non funzionare. – lpiepiora

+2

Come ho notato nel tuo ticket (e dopo aver giocato in giro, mi sono confuso io stesso) si scopre che la dimensione del buffer di default per gli stadi asincroni è 16 e il saldo finisce inviando tutti i messaggi ad un singolo stadio dal momento che segnala come ancora con buffer spazio. Se si inviano più messaggi (come 100) o si imposta la dimensione del buffer della fase di lavoro su 1, verranno visualizzati i risultati desiderati. –

+0

Sì, giusto, anche l'impostazione 'waitForAllDownstreams = true' potrebbe aiutare un po 'con esso. Penso che (non l'ho davvero controllato), quello che succede è che come hai detto i primi report downstream, e tutti i messaggi vengono inviati ad esso.Con 'waitForAllDownstreams', la distribuzione è leggermente migliore – lpiepiora