ho bisogno di fare qualcosa di veramente simile a questo https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scalaCome si consuma flussi secondari raggruppati con mapAsync in Akka flussi
il mio problema è che ho un numero imprecisato di gruppi e se il numero di parallelismo del mapAsync è meno del numero dei gruppi e ho ottenuto l'errore nell'ultima lavandino
Abbattere SynchronousFileSink (/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) causa di un errore a monte (akka.stream.impl.StreamSubscriptionTimeoutSupport $$ anon $ 2)
ho cercato di mettere un buffer in mezzo come suggerito nella guida modello di Akka ruscelli http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html
groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
ma con lo stesso risultato
Mi chiedo se l'uso di 'mapAsync' serve a qualsiasi scopo in primo luogo? Cosa succede se usi semplicemente 'map'? – jrudolph
con la mappa i gruppi non sono consumati in parallelo/asincrono che è il mio comportamento desiderato – Sammyrulez
Penso che sia un equivoco. Tutti i gruppi sono rappresentati da un 'Source [Something]' (dopo 'groupBy' hai un' Source [Source [Something]] ', giusto?). Quindi, l'unica cosa che devi fare all'interno della 'map' (' foreach' dovrebbe funzionare bene) sarebbe quella di eseguire i sub-flussi che sono un'operazione asincrona. I subflows sarebbero quindi eseguiti da soli e l'elemento 'map' sarebbe libero di accettare il prossimo' Source [Something] '. – jrudolph