Come descritto in akka streams documentation ho cercato di creare una riserva di lavoratori (flussi):Pool di lavoratori con Akka Streams
def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[In](workerCount))
val merge = b.add(Merge[Out](workerCount))
for (_ <- 1 to workerCount) {
balancer ~> worker ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
poi usato questa funzione per eseguire un flusso in parallelo:
def main(args: Array[String]) {
val system = ActorSystem()
implicit val mat = ActorMaterializer.create(system)
val flow = Flow[Int].map(e => {
println(e)
Thread.sleep(1000) // 1 second
e
})
Source(Range.apply(1, 10).toList)
.via(balancer(flow, 3))
.runForeach(e => {})
}
Ottengo l'output previsto 1, 2, 3, 4, 5, 6, 7, 8, 9
ma i numeri appaiono ad una velocità 1 al secondo (nessun parallelismo). Cosa sto facendo di sbagliato?
che dire di contesto di esecuzione? Se si utilizza un pool di thread di dimensioni fisse con uno normale, è –
Ciò significa che la dimensione del contesto predefinito è 1? Potresti per favore precisare qual è il modo preferito per configurare il contesto di esecuzione? – Mihai238
Nessun contesto predefinito non è fisso, probabilmente stai importando il contesto implicito globale, che dipenderà da troppe cose come la versione, puoi provare 'implicit val ec = ExecutionContext.fromExecutor (Executors.newFixedThreadPool (10))' –