2014-04-30 20 views
5

Sto utilizzando pika.BlockingConnection in un utente che esegue alcune attività per ciascun messaggio. Ho anche aggiunto la gestione dei segnali in modo che il consumatore muoia correttamente dopo aver eseguito completamente tutte le attività.gestione del segnale pika/python

Mentre il messaggio viene elaborato e il segnale viene ricevuto, ottengo solo "signal received" dalla funzione, ma il codice non esce. Così, ho deciso di controllare anche il segnale ricevuto alla fine della funzione di callback. La domanda è: quante volte controllo il segnale, poiché ci saranno molte più funzioni in questo codice. C'è un modo migliore di gestire i segnali senza esagerare?

Questa è la mia prima domanda qui, quindi fammi sapere se sono necessari ulteriori dettagli.

+0

Il codice attuale ingoiare SIGTERM o SIGINT fino alla ricezione del messaggio successivo tramite la coda, a quel punto dovrebbe uscire. È in realtà quello che vuoi? Perché non usare il metodo 'signal_handler' per chiamare direttamente' sys.exit (0) '? – dano

+1

Desidero che la gestione del segnale avvenga in due modi: 1) Durante l'attesa dei messaggi, dovrebbe semplicemente morire 2) Mentre consuma un messaggio, dovrebbe completare il lavoro corrente e quindi morire. Il mio codice corrente incorpora la seconda condizione, ma non la prima. Questo è il problema. Sarebbe anche possibile? – user3295878

+0

Sì, è possibile. Aggiungerò una risposta – dano

risposta

3

penso che questo fa quello che stai cercando:

#!/usr/bin/python 

import signal 
import sys 
import pika 
from contextlib import contextmanager 

received_signal = False 
processing_callback = False 

def signal_handler(signal, frame): 
    global received_signal 
    print "signal received" 
    received_signal = True 
    if not processing_callback: 
     sys.exit() 

signal.signal(signal.SIGINT, signal_handler) 
signal.signal(signal.SIGTERM, signal_handler) 

@contextmanager 
def block_signals(): 
    global processing_callback 
    processing_callback = True 
    try: 
     yield 
    finally: 
     processing_callback = False 
     if received_signal: 
      sys.exit() 

def callback(ch, method, properties, body): 
    with block_signals: 
     print body 
     sum(xrange(0, 200050000)) # sleep gets interrupted by signals, this doesn't. 
     mq_channel.basic_ack(delivery_tag=method.delivery_tag) 
     print "Message consumption complete" 

if __name__ == "__main__":  
    try: 
     mq_connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     mq_channel = mq_connection.channel() 
     print ' [*] Waiting for messages. To exit press CTRL+C' 
     mq_channel.basic_consume(callback, queue='test') 
     mq_channel.start_consuming() 
    except Exception as e: 
     mq_channel.close() 
     sys.exit() 

ho usato un contextmanager per gestire bloccando i segnali, in modo che tutta la logica è nascosto al di fuori della richiamata stessa. Questo dovrebbe anche rendere più facile il riutilizzo del codice. Giusto per chiarire come si sta lavorando, è equivalente a questo:

def callback(ch, method, properties, body): 
    global processing_callback 
    processing_callback = True 
    try: 
     print body 
     sum(xrange(0, 200050000)) 
     mq_channel.basic_ack(delivery_tag=method.delivery_tag) 
     print "Message consumption complete" 
    finally: 
     processing_callback = False 
     if received_signal: 
      sys.exit() 
+0

Usa xrange invece che range, o la tua memoria esploderà e comincerà a battere mentre esce su disco. – Dunk

+0

@ Dunk, grazie, risolto. – dano