2014-11-10 21 views
6

Dopo Google per alcuni giorni, e credo di essermi completamente perso. Vorrei implementare una sorta di coda di priorità con circa 3 code:Spring AMQP RabbitMQ che implementa la coda di priorità

  1. coda ad alta priorità (giornaliera), che deve essere elaborata per prima.
  2. coda di priorità media (settimanale), che verrà elaborata se nessun elemento nella coda n. (è un messaggio ok in questa coda che non viene mai elaborato)
  3. coda a bassa priorità (mensile), che verrà elaborata se nessun elemento nella coda n. 1 & # 2. (è un messaggio ok in questa coda non si elabora mai)

Inizialmente ho il seguente flusso, per consentire a un utente di consumare messaggi da tutte e tre le code e verificare se ci sono articoli nella coda n. 1, # 2 e # 3. e poi mi rendo conto che questo è sbagliato perché:

  1. Sono totalmente perso con una domanda: "Come faccio a sapere da quale coda proviene?".
  2. Sto già consumando un messaggio indipendentemente da qualsiasi coda, quindi se ottengo un oggetto dalla coda con priorità più bassa, lo rimetterò in coda se scoprirò che c'è un messaggio nella coda con priorità più alta?

Di seguito sono le mie configurazioni correnti, che mostra ciò che sono un idiota.

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd 
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> 

<rabbit:connection-factory id="connectionFactory" host="localhost" /> 

<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory" 
    exchange="" routing-key="daily_queue"/> 

<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory" 
    exchange="" routing-key="weekly_queue"/> 

<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory" 
    exchange="" routing-key="monthly_queue"/> 

<rabbit:admin connection-factory="connectionFactory" /> 

<rabbit:listener-container connection-factory="connectionFactory"> 
    <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" /> 
</rabbit:listener-container> 

<rabbit:listener-container connection-factory="connectionFactory"> 
    <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" /> 
</rabbit:listener-container>  

<rabbit:listener-container connection-factory="connectionFactory"> 
    <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" /> 
</rabbit:listener-container>  

<bean id="Consumer" class="com.test.Consumer" /> 

</beans> 

Qualche idea su come dovrei affrontare questo problema con la coda prioritaria?

ps: Mi chiedo anche, se Apache Camel ha qualcosa su cui posso contare?

AGGIORNAMENTO 1: Ho appena visto questo da Apache Camel: "https://issues.apache.org/jira/browse/CAMEL-2537" il sequencer su JMSPriority sembra essere quello che sto cercando, qualcuno ha provato prima?

UPDATE 2: supponendo che sto per usare base di plug-in di RabbitMQ su raccomandazione @Gary Russell, ho la seguente configurazione XML contesto primavera-RabbitMQ, che sembra avere un senso (da ospite ..):

<rabbit:queue name="ad_google_dfa_reporting_queue"> 
    <rabbit:queue-arguments> 
      <entry key="x-max-priority" value="10"/> 
    </rabbit:queue-arguments> 
</rabbit:queue> 

<rabbit:listener-container connection-factory="connectionFactory"> 
    <rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" /> 
</rabbit:listener-container> 

<bean id="Consumer" class="com.test.Consumer" /> 

la configurazione XML sopra è creare con successo una coda, con il nome: "ad_google_dfa_reporting_queue", e con argomenti parametri: x-max-priority: 10 & durevole: vero

Ma non quando arriva al codice che invia il messaggio con priorità, l'ho perso completamente. Come definire la priorità come menzione nel URL del campione: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java

AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting"); 
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority? 

UPDATE 3: Sulla base della @ risposta di Gary, riesco a messaggio inviato con la priorità impostata nel messaggio, come da immagine qui sotto: Tuttavia, quando ho inviato 1000 messaggi con priorità casuale tra 1-10, il consumatore sta consumando messaggi con tutti i tipi di priorità. (Mi aspettavo che solo il messaggio ad alta priorità fosse consumato per primo).Di seguito è riportato il codice per il produttore Messaggio:

Random random = new Random(); 
    for (int i=0; i< 1000; i++){ 
     final int priority = random.nextInt(10 - 1 + 1) + 1; 

     DfaReportingModel model = new DfaReportingModel(); 
     model.setReportType(DfaReportingModel.ReportType.FACT); 
     model.setUserProfileId(0l + priority); 
     amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() { 
      @Override 
      public Message postProcessMessage(Message message) throws AmqpException { 
       message.getMessageProperties().setPriority(priority); 
       return message; 
      } 
     }); 
    } 

E seguente è il codice per il messaggio di consumatori:

public void consume(DfaReportingModel message) { 
     System.out.println(message.getUserProfileId()); 

     Thread.sleep(500); 
    } 

Il risultato nel mese di ottenere:

9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1, 

AGGIORNAMENTO 4: Problema risolto! Conoscendo il codice di esempio da https://github.com/rabbitmq/rabbitmq-priority-queue sta lavorando nel mio ambiente, presumo che il problema è intorno al contesto di primavera. Quindi, dopo un tempo infinito di tentativi ed errori con diversi tipi di configurazioni, e punto il punto esatto della combinazione che farà funzionare! ed è secondo il seguente:

<rabbit:queue name="ad_google_dfa_reporting_queue"> 
    <rabbit:queue-arguments> 
     <entry key="x-max-priority"> 
      <value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work --> 
     </entry> 
    </rabbit:queue-arguments> 
</rabbit:queue> 

Senza specificamente definire il valore è di tipo intero, la coda di priorità non funziona. Alla fine, è risolto. Sìì!

+0

Per la p.s. Suggerisco di aggiungere un tag Apache Camel – mjn

+0

@mjn fatto. apache-camel aggiunto. – Reusable

+0

@ ben75 grazie !! mi chiedevo come mettere in ordine i fatti :) – Reusable

risposta

2

RabbitMQ ora ha un priority queue plugin dove i messaggi vengono consegnati in ordine di priorità. Sarebbe meglio usare quello piuttosto che il tuo schema di riaccodare i messaggi a bassa priorità che saranno piuttosto costosi in fase di runtime.

EDIT:

Quando si usano le metodi, e si desidera impostare la proprietà di priorità sul messaggio, vi sia bisogno di implementare una consuetudine MessagePropertiesConverter nel modello (sottoclasse la DefaultMessagePropertiesConverter) oppure utilizzare il convertAnSend varianti che accettano un messaggio post-processore; es .:

template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor() { 

    @Override 
    public Message postProcessMessage(Message message) throws AmqpException { 
     message.getMessageProperties().setPriority(5); 
     return message; 
    } 
}); 
+0

Ho visto questo plugin ed ero confuso su come implementare il plugin con Spring-rabbitmq. C'è qualche esempio di contesto xml che utilizza questo plugin? – Reusable

+0

Russel riesco a "indovinare" la configurazione di contesto xml più probabile, ma quando arriva al codice java che fa template.convertandsend(); il documento non ha alcun riferimento su come inviare la priorità. traccia? – Reusable

+0

Ho modificato la risposta. –

0

RabbitMQ trovi priority queue attuazione nel nucleo dalla versione 3.5.0.

È possibile dichiarare le code di priorità utilizzando l'argomento x-max-priority. Questo argomento dovrebbe essere un numero intero che indica la priorità massima che la coda dovrebbe supportare. Ad esempio, utilizzando il client Java:

Channel ch = ...; 
Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-max-priority", 10); 
ch.queueDeclare("my-priority-queue", true, false, false, args); 

È quindi possibile pubblicare messaggi prioritari utilizzando il campo prioritario di basic.properties. I numeri più grandi indicano una priorità più alta.