2015-07-09 28 views
10

Ho una coda SQS che viene costantemente popolata da un consumatore di dati e ora sto cercando di creare il servizio che estrarrà questi dati da SQS usando il boto di Python.Le migliori pratiche per il polling di una coda SQS AWS e l'eliminazione dei messaggi ricevuti dalla coda?

Il modo in cui l'ho progettato è che avrò 10-20 thread che tentano tutti di leggere i messaggi dalla coda SQS e quindi fare ciò che devono fare sui dati (business logic), prima di tornare alla coda per ottenere il prossimo batch di dati una volta che hanno finito. Se non ci sono dati, aspetteranno solo che alcuni dati siano disponibili.

ho due aree non sono sicuro circa con questo disegno

  1. E 'una questione di chiamare receive_message() con un valore time_out lunga e se non viene restituito nei 20 secondi (massimo consentito) quindi riprova? Oppure esiste un metodo di blocco che ritorna solo quando i dati sono disponibili?
  2. Ho notato che una volta ricevuto un messaggio, non viene eliminato dalla coda, devo ricevere un messaggio e quindi inviare un'altra richiesta dopo averlo ricevuto per eliminarlo dalla coda? sembra un po 'eccessivo.

Grazie

risposta

11

La capacità a lungo polling del metodo receive_message() è il modo più efficace per il polling SQS. Se questo ritorna senza alcun messaggio, raccomanderei un breve ritardo prima di riprovare, specialmente se hai più lettori. Potresti anche voler fare un ritardo incrementale in modo che ogni successiva lettura vuota aspetti un po 'di più, solo così non finirai per essere rallentato da AWS.

E sì, è necessario eliminare il messaggio dopo averlo letto o verrà nuovamente visualizzato nella coda. Questo può essere davvero molto utile nel caso in cui un lavoratore legga un messaggio e poi fallisca prima che possa elaborare completamente il messaggio. In quel caso, sarebbe re-accodato e letto da un altro lavoratore. Si desidera inoltre assicurarsi che il timeout dell'invisibilità dei messaggi sia impostato in modo che sia sufficientemente lungo da consentire al lavoratore di disporre del tempo sufficiente per elaborare il messaggio prima che riappaia automaticamente sulla coda. Se necessario, i lavoratori possono regolare il timeout mentre vengono elaborati se impiegano più tempo del previsto.

3

Un'altra opzione consiste nell'impostare un'applicazione utente utilizzando AWS Beanstalk come descritto in this blogpost.

Invece del polling lungo che utilizza boto3, l'applicazione del pallone riceve il messaggio come un oggetto JSON in un post HTTP. Il percorso HTTP e il tipo di messaggio che viene impostato sono configurabili nella scheda AWS Elastic Configurazione Beanstalk:

enter image description here

fagiolo magico elastico AWS ha il vantaggio di essere in grado di scalare in modo dinamico il numero di lavoratori in funzione della dimensione della coda SQS, insieme ai vantaggi della gestione della distribuzione.

This è un'applicazione di esempio che ho trovato utile come modello.

2

Se si desidera un modo semplice per impostare un listener che includa la cancellazione automatica dei messaggi al termine dell'elaborazione e l'invio automatico delle eccezioni a una coda specificata, è possibile utilizzare il pacchetto pySqsListener.

È possibile impostare un ascoltatore come questo:

from sqs_listener import SqsListener 

class MyListener(SqsListener): 
    def handle_message(self, body, attributes, messages_attributes): 
     run_my_function(body['param1'], body['param2'] 

listener = MyListener('my-message-queue', 'my-error-queue') 
listener.listen() 

Al momento il pacchetto usa breve polling, ma sto pensando di fare il tipo di polling configurabile.

Disclaimer: Sono l'autore di detto pacchetto.