2016-03-25 18 views
5

Possiedo un flusso Akka e voglio che lo streaming invii messaggi in streaming all'incirca ogni secondo.Come limitare un flusso Akka per eseguire e inviare un messaggio solo una volta al secondo?

Ho provato due modi per risolvere questo problema, il primo modo era far sì che il produttore all'inizio del flusso inviasse solo messaggi una volta al secondo quando un messaggio Continua entra in questo attore.

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

questo funziona per un breve periodo poi una marea di Continuare messaggi appaiono nel attore ActorPublisher, presumo (indovinare, ma non sono sicuro) da valle tramite back-pressione che richiede i messaggi come la valle può consumare velocemente, ma il monte non produce a un ritmo veloce. Quindi questo metodo fallì.

L'altro modo in cui ho provato era tramite il controllo di contropressione, ho utilizzato uno MaxInFlightRequestStrategy sullo ActorSubscriber alla fine dello stream per limitare il numero di messaggi a 1 al secondo. Funziona, ma i messaggi in arrivo arrivano approssimativamente a tre o più alla volta, non solo uno alla volta. Sembra che il controllo della contropressione non cambi immediatamente la velocità dei messaggi in arrivo O i messaggi erano già in coda nello stream e in attesa di essere elaborati.

Quindi il problema è, come posso avere un flusso Akka che può elaborare un messaggio solo al secondo?


ho scoperto che MaxInFlightRequestStrategy è un modo valido per farlo, ma devo impostare la dimensione del lotto 1, la sua dimensione del lotto è di default 5, che è stato la causa del problema che ho trovato. Inoltre è un modo troppo complicato per risolvere il problema ora che sto guardando la risposta inviata qui.

+2

Hai pensato di usare 'Source.tick'? – cmbaxter

+0

No, fammi dare un'occhiata a questo, grazie. – Phil

+0

puoi anche provare a 'throttle'. –

risposta

10

È possibile inserire i propri elementi attraverso il flusso di regolazione, che si rifletterà sulla sorgente veloce, oppure è possibile utilizzare la combinazione di tick e zip.

La soluzione pugno sarebbe stato così:

val veryFastSource = 
    Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) 

val throttlingFlow = Flow[Long].throttle(
    // how many elements do you allow 
    elements = 1, 
    // in what unit of time 
    per = 1.second, 
    maximumBurst = 0, 
    // you can also set this to Enforcing, but then your 
    // stream will collapse if exceeding the number of elements/s 
    mode = ThrottleMode.Shaping 
) 

veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println)) 

La seconda soluzione sarebbe come questo:

val veryFastSource = 
    Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) 

val tickingSource = Source.tick(1.second, 1.second, 0) 

veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println))