2015-08-25 15 views
5

ho bisogno di fare qualcosa di veramente simile a questo https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scalaCome si consuma flussi secondari raggruppati con mapAsync in Akka flussi

il mio problema è che ho un numero imprecisato di gruppi e se il numero di parallelismo del mapAsync è meno del numero dei gruppi e ho ottenuto l'errore nell'ultima lavandino

Abbattere SynchronousFileSink (/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) causa di un errore a monte (akka.stream.impl.StreamSubscriptionTimeoutSupport $$ anon $ 2)

ho cercato di mettere un buffer in mezzo come suggerito nella guida modello di Akka ruscelli http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html

groupBy { 
    case LoglevelPattern(level) => level 
    case other     => "OTHER" 
}.buffer(1000, OverflowStrategy.backpressure). 
    // write lines of each group to a separate file 
    mapAsync(parallelism = 2) {.... 

ma con lo stesso risultato

+0

Mi chiedo se l'uso di 'mapAsync' serve a qualsiasi scopo in primo luogo? Cosa succede se usi semplicemente 'map'? – jrudolph

+0

con la mappa i gruppi non sono consumati in parallelo/asincrono che è il mio comportamento desiderato – Sammyrulez

+2

Penso che sia un equivoco. Tutti i gruppi sono rappresentati da un 'Source [Something]' (dopo 'groupBy' hai un' Source [Source [Something]] ', giusto?). Quindi, l'unica cosa che devi fare all'interno della 'map' (' foreach' dovrebbe funzionare bene) sarebbe quella di eseguire i sub-flussi che sono un'operazione asincrona. I subflows sarebbero quindi eseguiti da soli e l'elemento 'map' sarebbe libero di accettare il prossimo' Source [Something] '. – jrudolph

risposta

3

Ampliando commento di jrudolph che è del tutto corretta ..

In questo caso non è necessario un mapAsync. Come esempio di base, supponiamo di avere una fonte di tuple

import akka.stream.scaladsl.{Source, Sink} 

def data() = List(("foo", 1), 
        ("foo", 2), 
        ("bar", 1), 
        ("foo", 3), 
        ("bar", 2)) 

val originalSource = Source(data) 

È quindi possibile eseguire un groupBy per creare un Source of Sources

def getID(tuple : (String, Int)) = tuple._1 

//a Source of (String, Source[(String, Int),_]) 
val groupedSource = originalSource groupBy getID 

Ognuna delle Fonti raggruppati possono essere trattati in parallelo con un solo map, non c'è bisogno di niente di speciale. Ecco un esempio di ciascun raggruppamento essendo sommate in un flusso indipendente:

import akka.actor.ActorSystem 
import akka.stream.ACtorMaterializer 

implicit val actorSystem = ActorSystem() 
implicit val mat = ActorMaterializer() 
import actorSystem.dispatcher 

def getValues(tuple : (String, Int)) = tuple._2 

//does not have to be a def, we can re-use the same sink over-and-over 
val sumSink = Sink.fold[Int,Int](0)(_ + _) 

//a Source of (String, Future[Int]) 
val sumSource = 
    groupedSource map { case (id, src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream 
    } 

Ora tutti i "foo" numeri vengono sommati in parallelo con tutti i numeri "bar".

mapAsync viene utilizzato quando si dispone di una funzione incapsulata che restituisce un Future[T] e si sta tentando di emettere un T invece; quale non è il caso nella tua domanda. Inoltre, mapAsync coinvolge waiting for results che non è reactive ...

+1

Ora che gli stream di akka sono stati uniti con akka e la semantica di Source of Sources è stata sostituita con SubFlows, come si può realizzare un comportamento simile? – AlphaGeek

+0

@AlphaGeek Ho notato che la funzionalità groupBy è cambiata un paio di mesi fa. Il libro di cucina ha un esempio aggiornato basato sull'approccio dei nuovi substrati: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-cookbook.html#Implementing_reduce-by-key –

+0

@RamonJRomeroyVigil Ricettario non lo spiega Usano la riduzione che è irraggiungibile se abbiamo montanti di elementi in ogni substream. Quello che io (e anche l'autore) voglio è ottenere 'Source' per gruppo. Come lo faremmo? – expert