2015-02-16 10 views
5

Ho due istanze separate di RabbitMQ. Sto cercando di trovare il modo migliore per ascoltare gli eventi di entrambi.Python e RabbitMQ - Il modo migliore per ascoltare consumare eventi da più canali?

Ad esempio, posso consumare eventi contro uno con il seguente:

credentials = pika.PlainCredentials(user, pass) 
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials)) 
channel = connection.channel() 
result = channel.queue_declare(Exclusive=True) 
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*') 
channel.basic_consume(callback_func, result.method.queue, no_ack=True) 
self.channel.start_consuming() 

ho una seconda serie, "host2", che mi piacerebbe ascoltare pure. Ho pensato di creare due thread separati per farlo, ma da quello che ho letto, pika non è thread-safe. C'è un modo migliore? O sarebbe sufficiente creare due thread separati, ognuno dei quali ascolta un'istanza di Rabbit diversa (host1 e host2)?

risposta

24

La risposta a "qual è il modo migliore" dipende in gran parte dal modello di utilizzo delle code e da cosa intendi per "migliore". Dato che non posso ancora commentare le domande, cercherò di suggerire alcune possibili soluzioni.

In ogni esempio, assumerò che lo scambio sia già stato dichiarato.

discussioni

Si può consumare messaggio per due code su host separati in unico processo utilizzando pika.

Hai ragione - come its own FAQ states, pika non è thread-safe, ma può essere utilizzato in modalità multi-thread creando connessioni a host RabbitMQ per thread. Fare questo esempio eseguito in thread utilizzando il modulo threading appare come segue:

import pika 
import threading 


class ConsumerThread(threading.Thread): 
    def __init__(self, host, *args, **kwargs): 
     super(ConsumerThread, self).__init__(*args, **kwargs) 

     self._host = host 

    # Not necessarily a method. 
    def callback_func(self, channel, method, properties, body): 
     print("{} received '{}'".format(self.name, body)) 

    def run(self): 
     credentials = pika.PlainCredentials("guest", "guest") 

     connection = pika.BlockingConnection(
      pika.ConnectionParameters(host=self._host, 
             credentials=credentials)) 

     channel = connection.channel() 

     result = channel.queue_declare(exclusive=True) 

     channel.queue_bind(result.method.queue, 
          exchange="my-exchange", 
          routing_key="*.*.*.*.*") 

     channel.basic_consume(self.callback_func, 
           result.method.queue, 
           no_ack=True) 

     channel.start_consuming() 


if __name__ == "__main__": 
    threads = [ConsumerThread("host1"), ConsumerThread("host2")] 
    for thread in threads: 
     thread.start() 

ho dichiarato callback_func come un metodo puramente da utilizzare durante la stampa ConsumerThread.name corpo del messaggio. Potrebbe anche essere una funzione esterna alla classe ConsumerThread.

Processi

In alternativa, si può sempre e solo eseguire un unico processo con codice del consumo per coda che si desidera consumare eventi.

import pika 
import sys 


def callback_func(channel, method, properties, body): 
    print(body) 


if __name__ == "__main__": 
    credentials = pika.PlainCredentials("guest", "guest") 

    connection = pika.BlockingConnection(
     pika.ConnectionParameters(host=sys.argv[1], 
            credentials=credentials)) 

    channel = connection.channel() 

    result = channel.queue_declare(exclusive=True) 

    channel.queue_bind(result.method.queue, 
         exchange="my-exchange", 
         routing_key="*.*.*.*.*") 

    channel.basic_consume(callback_func, result.method.queue, no_ack=True) 

    channel.start_consuming() 

e poi gestita da:

$ python single_consume.py host1 
$ python single_consume.py host2 # e.g. on another console 

Se il lavoro che stai facendo sul messaggio per code è CPU-heavy e finché il numero di core nella CPU> = numero di consumatori, generalmente è meglio usare questo approccio, a meno che le code non siano vuote il più delle volte e gli utenti non utilizzeranno questa CPU *.

asincrono

Un'altra alternativa è quella di comportare una certa quadro asincrona (ad esempio Twisted) e funzionante tutto in unico filo.

Non è più possibile utilizzare BlockingConnection in codice asincrono; fortunatamente, pika ha adattatore per il Twisted:

from pika.adapters.twisted_connection import TwistedProtocolConnection 
from pika.connection import ConnectionParameters 
from twisted.internet import protocol, reactor, task 
from twisted.python import log 


class Consumer(object): 
    def on_connected(self, connection): 
     d = connection.channel() 
     d.addCallback(self.got_channel) 
     d.addCallback(self.queue_declared) 
     d.addCallback(self.queue_bound) 
     d.addCallback(self.handle_deliveries) 
     d.addErrback(log.err) 

    def got_channel(self, channel): 
     self.channel = channel 

     return self.channel.queue_declare(exclusive=True) 

    def queue_declared(self, queue): 
     self._queue_name = queue.method.queue 

     self.channel.queue_bind(queue=self._queue_name, 
           exchange="my-exchange", 
           routing_key="*.*.*.*.*") 

    def queue_bound(self, ignored): 
     return self.channel.basic_consume(queue=self._queue_name) 

    def handle_deliveries(self, queue_and_consumer_tag): 
     queue, consumer_tag = queue_and_consumer_tag 
     self.looping_call = task.LoopingCall(self.consume_from_queue, queue) 

     return self.looping_call.start(0) 

    def consume_from_queue(self, queue): 
     d = queue.get() 

     return d.addCallback(lambda result: self.handle_payload(*result)) 

    def handle_payload(self, channel, method, properties, body): 
     print(body) 


if __name__ == "__main__": 
    consumer1 = Consumer() 
    consumer2 = Consumer() 

    parameters = ConnectionParameters() 
    cc = protocol.ClientCreator(reactor, 
           TwistedProtocolConnection, 
           parameters) 
    d1 = cc.connectTCP("host1", 5672) 
    d1.addCallback(lambda protocol: protocol.ready) 
    d1.addCallback(consumer1.on_connected) 
    d1.addErrback(log.err) 

    d2 = cc.connectTCP("host2", 5672) 
    d2.addCallback(lambda protocol: protocol.ready) 
    d2.addCallback(consumer2.on_connected) 
    d2.addErrback(log.err) 

    reactor.run() 

Questo approccio sarebbe ancora meglio, i più code che ci si consumano da e meno CPU-bound il lavoro eseguire da parte dei consumatori è *.

Python 3

Dal momento che hai citato pika, mi sono limitato a soluzioni Python 2.x-based, in quanto non è ancora pika porting.

Ma nel caso in cui si desideri passare a> = 3.3, una possibile opzione è quella di utilizzare asyncio con uno dei protocolli AMQP (il protocollo con cui si parla con RabbitMQ), ad es. asynqp o aioamqp.

* - si prega di notare che questi sono consigli molto superficiali - nella maggior parte dei casi la scelta non è così ovvia; quale sarà il meglio per te dipende dalla "saturazione" delle code (messaggi/ora), che lavoro fai dopo aver ricevuto questi messaggi, quale ambiente gestisci i tuoi consumatori in ecc .; non c'è modo per essere sicuri diverso da quello di tutte le implementazioni di riferimento

+0

Grazie, molto utile. – blindsnowmobile

+0

Prego. :) Ho anche indicato un'altra cosa nella modifica. – Unit03

0

Di seguito è riportato un esempio di come io uso un caso RabbitMQ ascoltare 2 code allo stesso tempo:

import pika 
import threading 

threads=[] 
def client_info(channel):  
    channel.queue_declare(queue='proxy-python') 
    print (' [*] Waiting for client messages. To exit press CTRL+C') 


    def callback(ch, method, properties, body): 
     print (" Received %s" % (body)) 

    channel.basic_consume(callback, queue='proxy-python', no_ack=True) 
    channel.start_consuming() 

def scenario_info(channel):  
    channel.queue_declare(queue='savi-virnet-python') 
    print (' [*] Waiting for scenrio messages. To exit press CTRL+C') 


    def callback(ch, method, properties, body): 
     print (" Received %s" % (body)) 

    channel.basic_consume(callback, queue='savi-virnet-python', no_ack=True) 
    channel.start_consuming() 

def manager(): 
    connection1= pika.BlockingConnection(pika.ConnectionParameters 
    (host='localhost')) 
    channel1 = connection1.channel() 
    connection2= pika.BlockingConnection(pika.ConnectionParameters 
    (host='localhost')) 
    channel2 = connection2.channel() 
    t1 = threading.Thread(target=client_info, args=(channel1,)) 
    t1.daemon = True 
    threads.append(t1) 
    t1.start() 

    t2 = threading.Thread(target=scenario_info, args=(channel2,)) 
    t2.daemon = True 
    threads.append(t2) 


    t2.start() 
    for t in threads: 
    t.join() 


manager()