La risposta a "qual è il modo migliore" dipende in gran parte dal modello di utilizzo delle code e da cosa intendi per "migliore". Dato che non posso ancora commentare le domande, cercherò di suggerire alcune possibili soluzioni.
In ogni esempio, assumerò che lo scambio sia già stato dichiarato.
discussioni
Si può consumare messaggio per due code su host separati in unico processo utilizzando pika
.
Hai ragione - come its own FAQ states, pika
non è thread-safe, ma può essere utilizzato in modalità multi-thread creando connessioni a host RabbitMQ per thread. Fare questo esempio eseguito in thread utilizzando il modulo threading
appare come segue:
import pika
import threading
class ConsumerThread(threading.Thread):
def __init__(self, host, *args, **kwargs):
super(ConsumerThread, self).__init__(*args, **kwargs)
self._host = host
# Not necessarily a method.
def callback_func(self, channel, method, properties, body):
print("{} received '{}'".format(self.name, body))
def run(self):
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self._host,
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(self.callback_func,
result.method.queue,
no_ack=True)
channel.start_consuming()
if __name__ == "__main__":
threads = [ConsumerThread("host1"), ConsumerThread("host2")]
for thread in threads:
thread.start()
ho dichiarato callback_func
come un metodo puramente da utilizzare durante la stampa ConsumerThread.name
corpo del messaggio. Potrebbe anche essere una funzione esterna alla classe ConsumerThread
.
Processi
In alternativa, si può sempre e solo eseguire un unico processo con codice del consumo per coda che si desidera consumare eventi.
import pika
import sys
def callback_func(channel, method, properties, body):
print(body)
if __name__ == "__main__":
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=sys.argv[1],
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
channel.start_consuming()
e poi gestita da:
$ python single_consume.py host1
$ python single_consume.py host2 # e.g. on another console
Se il lavoro che stai facendo sul messaggio per code è CPU-heavy e finché il numero di core nella CPU> = numero di consumatori, generalmente è meglio usare questo approccio, a meno che le code non siano vuote il più delle volte e gli utenti non utilizzeranno questa CPU *.
asincrono
Un'altra alternativa è quella di comportare una certa quadro asincrona (ad esempio Twisted
) e funzionante tutto in unico filo.
Non è più possibile utilizzare BlockingConnection
in codice asincrono; fortunatamente, pika
ha adattatore per il Twisted
:
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log
class Consumer(object):
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)
def got_channel(self, channel):
self.channel = channel
return self.channel.queue_declare(exclusive=True)
def queue_declared(self, queue):
self._queue_name = queue.method.queue
self.channel.queue_bind(queue=self._queue_name,
exchange="my-exchange",
routing_key="*.*.*.*.*")
def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)
def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
return self.looping_call.start(0)
def consume_from_queue(self, queue):
d = queue.get()
return d.addCallback(lambda result: self.handle_payload(*result))
def handle_payload(self, channel, method, properties, body):
print(body)
if __name__ == "__main__":
consumer1 = Consumer()
consumer2 = Consumer()
parameters = ConnectionParameters()
cc = protocol.ClientCreator(reactor,
TwistedProtocolConnection,
parameters)
d1 = cc.connectTCP("host1", 5672)
d1.addCallback(lambda protocol: protocol.ready)
d1.addCallback(consumer1.on_connected)
d1.addErrback(log.err)
d2 = cc.connectTCP("host2", 5672)
d2.addCallback(lambda protocol: protocol.ready)
d2.addCallback(consumer2.on_connected)
d2.addErrback(log.err)
reactor.run()
Questo approccio sarebbe ancora meglio, i più code che ci si consumano da e meno CPU-bound il lavoro eseguire da parte dei consumatori è *.
Python 3
Dal momento che hai citato pika
, mi sono limitato a soluzioni Python 2.x-based, in quanto non è ancora pika
porting.
Ma nel caso in cui si desideri passare a> = 3.3, una possibile opzione è quella di utilizzare asyncio
con uno dei protocolli AMQP (il protocollo con cui si parla con RabbitMQ), ad es. asynqp
o aioamqp
.
* - si prega di notare che questi sono consigli molto superficiali - nella maggior parte dei casi la scelta non è così ovvia; quale sarà il meglio per te dipende dalla "saturazione" delle code (messaggi/ora), che lavoro fai dopo aver ricevuto questi messaggi, quale ambiente gestisci i tuoi consumatori in ecc .; non c'è modo per essere sicuri diverso da quello di tutte le implementazioni di riferimento
Grazie, molto utile. – blindsnowmobile
Prego. :) Ho anche indicato un'altra cosa nella modifica. – Unit03