2013-11-04 5 views
6

Un progetto su cui sto lavorando richiede la lettura dei messaggi da SQS e ho deciso di utilizzare Akka per distribuire l'elaborazione di questi messaggi.Tasso di polling dei consumatori con Akka, SQS e Camel

Poiché SQS è supportato da Camel e sono presenti funzionalità incorporate per l'utilizzo in Akka nella classe Consumer, ho immaginato che sarebbe stato meglio implementare l'endpoint e leggere i messaggi in questo modo, sebbene non avessi visto molti esempi di gente che lo fa

Il mio problema è che non riesco a interrogare la mia coda abbastanza rapidamente da mantenere la mia coda vuota, o quasi vuota. Quello che inizialmente pensavo era che avrei potuto ottenere un Consumatore per ricevere messaggi su Camel da SQS ad un tasso di X/s. Da lì, ho potuto semplicemente creare più consumatori per ottenere la velocità con cui avevo bisogno di elaborare i messaggi.

mio Consumer:

import akka.camel.{CamelMessage, Consumer} 
import akka.actor.{ActorRef, ActorPath} 

class MyConsumer() extends Consumer { 
    def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" 
    var count = 0 

    def receive = { 
    case msg: CamelMessage => { 
     count += 1 
    } 
    case _ => { 
     println("Got something else") 
    } 
    } 

    override def postStop(){ 
    println("Count for actor: " + count) 
    } 
} 

Come mostrato, ho impostato delay=1 nonché &maxMessagesPerPoll=10 per migliorare il tasso di messaggi, ma sono in grado di generare più consumatori con lo stesso endpoint.

ho letto nella documentazione che By default endpoints are assumed not to support multiple consumers. e credo che questo vale anche per gli endpoint SQS così, come la deposizione delle uova più consumatori mi darà un solo consumatore dove dopo l'esecuzione del sistema per un minuto, il messaggio di output è Count for actor: x al posto del altri che emettono Count for actor: 0.

Se questo è affatto utile; Sono in grado di leggere circa 33 messaggi al secondo con questa implementazione corrente sul singolo consumatore.

È questo il modo corretto di leggere i messaggi da una coda SQS in Akka? In tal caso, posso arrivare a ridimensionarlo in modo tale da aumentare il mio tasso di consumo dei messaggi più vicino a quello di 900 messaggi al secondo?

risposta

5

Purtroppo Camel attualmente non supporta il consumo parallelo di messaggi su SQS.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

di affrontare questo ho scritto il mio attore per prelevare i messaggi lotti SQS utilizzando l'AWS-java-sdk.

def receive = { 
    case BeginPolling => { 
     // re-queue sending asynchronously 
     self ! BeginPolling 
     // traverse the response 
     val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] 
     val messages = sqs.receiveMessage(receiveMessageRequest).getMessages 
     messages.toList.foreach { 
     node => { 
      deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle)) 
      //log.info("Node body: {}", node.getBody) 
      filterSupervisor ! node.getBody 
     } 
     } 
     if(deleteEntryList.size() > 0){ 
     val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList) 
     sqs.deleteMessageBatch(deleteMessageBatchRequest) 
     } 
    } 

    case _ => { 
     log.warning("Unknown message") 
    } 
    } 

Anche se non sono sicuro se questo è il miglior implementazione, e potrebbe ovviamente essere migliorato in modo che le richieste non stanno colpendo costantemente una coda vuota, si fa per le mie attuali esigenze di essere in grado di eseguire il polling messaggi dalla stessa coda.

Ottenendo circa 133 (messaggi/secondo)/attore da SQS con questo.

1

Camel 2.15 supporta i ConcorrentConsumers, anche se non sono sicuro di quanto sia utile in quanto non so se il supporto per cammello akka è 2.15 e non so se avere un attore Consumer fa la differenza anche se ci sono più utenti.