2013-12-18 11 views
6

Sto usando basic_consume() per ricevere messaggi e basic_cancel per cancellare il consumo, ma c'è un problema.Come sospendere e riprendere il consumo con garbo in rabbitmq, pika python

Ecco il codice di pika.channel

def basic_consume(self, consumer_callback, queue='', no_ack=False, 
         exclusive=False, consumer_tag=None): 
     """Sends the AMQP command Basic.Consume to the broker and binds messages 
     for the consumer_tag to the consumer callback. If you do not pass in 
     a consumer_tag, one will be automatically generated for you. Returns 
     the consumer tag. 

     For more information on basic_consume, see: 
     http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume 

     :param method consumer_callback: The method to callback when consuming 
     :param queue: The queue to consume from 
     :type queue: str or unicode 
     :param bool no_ack: Tell the broker to not expect a response 
     :param bool exclusive: Don't allow other consumers on the queue 
     :param consumer_tag: Specify your own consumer tag 
     :type consumer_tag: str or unicode 
     :rtype: str 

     """ 
     self._validate_channel_and_callback(consumer_callback) 

     # If a consumer tag was not passed, create one 
     consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_number, 
                 uuid.uuid4().get_hex()) 

     if consumer_tag in self._consumers or consumer_tag in self._cancelled: 
      raise exceptions.DuplicateConsumerTag(consumer_tag) 

     self._consumers[consumer_tag] = consumer_callback 
     self._pending[consumer_tag] = list() 
     self._rpc(spec.Basic.Consume(queue=queue, 
            consumer_tag=consumer_tag, 
            no_ack=no_ack, 
            exclusive=exclusive), 
          self._on_eventok, 
          [(spec.Basic.ConsumeOk, 
          {'consumer_tag': consumer_tag})]) 

     return consumer_tag 

def basic_cancel(self, callback=None, consumer_tag='', nowait=False): 
     """This method cancels a consumer. This does not affect already 
     delivered messages, but it does mean the server will not send any more 
     messages for that consumer. The client may receive an arbitrary number 
     of messages in between sending the cancel method and receiving the 
     cancel-ok reply. It may also be sent from the server to the client in 
     the event of the consumer being unexpectedly cancelled (i.e. cancelled 
     for any reason other than the server receiving the corresponding 
     basic.cancel from the client). This allows clients to be notified of 
     the loss of consumers due to events such as queue deletion. 

     :param method callback: Method to call for a Basic.CancelOk response 
     :param str consumer_tag: Identifier for the consumer 
     :param bool nowait: Do not expect a Basic.CancelOk response 
     :raises: ValueError 

     """ 
     self._validate_channel_and_callback(callback) 
     if consumer_tag not in self.consumer_tags: 
      return 
     if callback: 
      if nowait is True: 
       raise ValueError('Can not pass a callback if nowait is True') 
      self.callbacks.add(self.channel_number, 
           spec.Basic.CancelOk, 
           callback) 
     self._cancelled.append(consumer_tag) 
     self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag, 
            nowait=nowait), 
        self._on_cancelok, 
        [(spec.Basic.CancelOk, 
        {'consumer_tag': consumer_tag})] if nowait is False else []) 

Come si può vedere ogni volta che sto annullando consumo consumer_tag viene aggiunto alla lista _canceled. E se usassi questo tag in basic_consume di nuovo, verrà sollevata l'eccezione duplicateConsumer. Bene, potrei usare un nuovo consumer_tag ogni volta, ma in realtà non lo sono. Perché prima o poi il tag generato corrisponderebbe esattamente a quelli precedenti.

Come devo sospendere e riprendere il consumo con garbo in pika?

risposta

1

C'è un motivo per cui si definisce il proprio consumer_tags? È possibile passare una stringa vuota e consentire a RabbitMQ di generare tag di consumo. La risposta da basic.consume, che è basic.consume-ok, restituirà lo consumer_tag generato, quindi è possibile utilizzarlo in seguito per interrompere il consumo.

See: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume-ok

+0

Dal codice allegato del modulo pika, è ovvio che anche nel caso in cui l'applicazione client non specifichi un tag di consumo, il modulo pika ne genererà uno e lo includerà nella richiesta di consumo. – mike

1

Che assomiglia Pika sta facendo più del dovuto - non ha bisogno di creare un tag consumatore se non viene fornito (il server sarà) e inoltre non ha bisogno di guardare i tag di consumo duplicati (la ripresa con lo stesso tag è supportata dal server).

Quindi non sono sicuro di come farlo con Pika, si supponga che sia un errore.