Possiedo un flusso Akka e voglio che lo streaming invii messaggi in streaming all'incirca ogni secondo.Come limitare un flusso Akka per eseguire e inviare un messaggio solo una volta al secondo?
Ho provato due modi per risolvere questo problema, il primo modo era far sì che il produttore all'inizio del flusso inviasse solo messaggi una volta al secondo quando un messaggio Continua entra in questo attore.
// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }
questo funziona per un breve periodo poi una marea di Continuare messaggi appaiono nel attore ActorPublisher, presumo (indovinare, ma non sono sicuro) da valle tramite back-pressione che richiede i messaggi come la valle può consumare velocemente, ma il monte non produce a un ritmo veloce. Quindi questo metodo fallì.
L'altro modo in cui ho provato era tramite il controllo di contropressione, ho utilizzato uno MaxInFlightRequestStrategy
sullo ActorSubscriber
alla fine dello stream per limitare il numero di messaggi a 1 al secondo. Funziona, ma i messaggi in arrivo arrivano approssimativamente a tre o più alla volta, non solo uno alla volta. Sembra che il controllo della contropressione non cambi immediatamente la velocità dei messaggi in arrivo O i messaggi erano già in coda nello stream e in attesa di essere elaborati.
Quindi il problema è, come posso avere un flusso Akka che può elaborare un messaggio solo al secondo?
ho scoperto che MaxInFlightRequestStrategy
è un modo valido per farlo, ma devo impostare la dimensione del lotto 1, la sua dimensione del lotto è di default 5, che è stato la causa del problema che ho trovato. Inoltre è un modo troppo complicato per risolvere il problema ora che sto guardando la risposta inviata qui.
Hai pensato di usare 'Source.tick'? – cmbaxter
No, fammi dare un'occhiata a questo, grazie. – Phil
puoi anche provare a 'throttle'. –