Un progetto su cui sto lavorando richiede la lettura dei messaggi da SQS e ho deciso di utilizzare Akka per distribuire l'elaborazione di questi messaggi.Tasso di polling dei consumatori con Akka, SQS e Camel
Poiché SQS è supportato da Camel e sono presenti funzionalità incorporate per l'utilizzo in Akka nella classe Consumer, ho immaginato che sarebbe stato meglio implementare l'endpoint e leggere i messaggi in questo modo, sebbene non avessi visto molti esempi di gente che lo fa
Il mio problema è che non riesco a interrogare la mia coda abbastanza rapidamente da mantenere la mia coda vuota, o quasi vuota. Quello che inizialmente pensavo era che avrei potuto ottenere un Consumatore per ricevere messaggi su Camel da SQS ad un tasso di X/s. Da lì, ho potuto semplicemente creare più consumatori per ottenere la velocità con cui avevo bisogno di elaborare i messaggi.
mio Consumer:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
Come mostrato, ho impostato delay=1
nonché &maxMessagesPerPoll=10
per migliorare il tasso di messaggi, ma sono in grado di generare più consumatori con lo stesso endpoint.
ho letto nella documentazione che By default endpoints are assumed not to support multiple consumers.
e credo che questo vale anche per gli endpoint SQS così, come la deposizione delle uova più consumatori mi darà un solo consumatore dove dopo l'esecuzione del sistema per un minuto, il messaggio di output è Count for actor: x
al posto del altri che emettono Count for actor: 0
.
Se questo è affatto utile; Sono in grado di leggere circa 33 messaggi al secondo con questa implementazione corrente sul singolo consumatore.
È questo il modo corretto di leggere i messaggi da una coda SQS in Akka? In tal caso, posso arrivare a ridimensionarlo in modo tale da aumentare il mio tasso di consumo dei messaggi più vicino a quello di 900 messaggi al secondo?