Dopo Google per alcuni giorni, e credo di essermi completamente perso. Vorrei implementare una sorta di coda di priorità con circa 3 code:Spring AMQP RabbitMQ che implementa la coda di priorità
- coda ad alta priorità (giornaliera), che deve essere elaborata per prima.
- coda di priorità media (settimanale), che verrà elaborata se nessun elemento nella coda n. (è un messaggio ok in questa coda che non viene mai elaborato)
- coda a bassa priorità (mensile), che verrà elaborata se nessun elemento nella coda n. 1 & # 2. (è un messaggio ok in questa coda non si elabora mai)
Inizialmente ho il seguente flusso, per consentire a un utente di consumare messaggi da tutte e tre le code e verificare se ci sono articoli nella coda n. 1, # 2 e # 3. e poi mi rendo conto che questo è sbagliato perché:
- Sono totalmente perso con una domanda: "Come faccio a sapere da quale coda proviene?".
- Sto già consumando un messaggio indipendentemente da qualsiasi coda, quindi se ottengo un oggetto dalla coda con priorità più bassa, lo rimetterò in coda se scoprirò che c'è un messaggio nella coda con priorità più alta?
Di seguito sono le mie configurazioni correnti, che mostra ciò che sono un idiota.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<rabbit:connection-factory id="connectionFactory" host="localhost" />
<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
exchange="" routing-key="daily_queue"/>
<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
exchange="" routing-key="weekly_queue"/>
<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
exchange="" routing-key="monthly_queue"/>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>
<bean id="Consumer" class="com.test.Consumer" />
</beans>
Qualche idea su come dovrei affrontare questo problema con la coda prioritaria?
ps: Mi chiedo anche, se Apache Camel ha qualcosa su cui posso contare?
AGGIORNAMENTO 1: Ho appena visto questo da Apache Camel: "https://issues.apache.org/jira/browse/CAMEL-2537" il sequencer su JMSPriority sembra essere quello che sto cercando, qualcuno ha provato prima?
UPDATE 2: supponendo che sto per usare base di plug-in di RabbitMQ su raccomandazione @Gary Russell, ho la seguente configurazione XML contesto primavera-RabbitMQ, che sembra avere un senso (da ospite ..):
<rabbit:queue name="ad_google_dfa_reporting_queue">
<rabbit:queue-arguments>
<entry key="x-max-priority" value="10"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>
<bean id="Consumer" class="com.test.Consumer" />
la configurazione XML sopra è creare con successo una coda, con il nome: "ad_google_dfa_reporting_queue", e con argomenti parametri: x-max-priority: 10 & durevole: vero
Ma non quando arriva al codice che invia il messaggio con priorità, l'ho perso completamente. Come definire la priorità come menzione nel URL del campione: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java
AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?
UPDATE 3: Sulla base della @ risposta di Gary, riesco a messaggio inviato con la priorità impostata nel messaggio, come da immagine qui sotto: Tuttavia, quando ho inviato 1000 messaggi con priorità casuale tra 1-10, il consumatore sta consumando messaggi con tutti i tipi di priorità. (Mi aspettavo che solo il messaggio ad alta priorità fosse consumato per primo).Di seguito è riportato il codice per il produttore Messaggio:
Random random = new Random();
for (int i=0; i< 1000; i++){
final int priority = random.nextInt(10 - 1 + 1) + 1;
DfaReportingModel model = new DfaReportingModel();
model.setReportType(DfaReportingModel.ReportType.FACT);
model.setUserProfileId(0l + priority);
amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(priority);
return message;
}
});
}
E seguente è il codice per il messaggio di consumatori:
public void consume(DfaReportingModel message) {
System.out.println(message.getUserProfileId());
Thread.sleep(500);
}
Il risultato nel mese di ottenere:
9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,
AGGIORNAMENTO 4: Problema risolto! Conoscendo il codice di esempio da https://github.com/rabbitmq/rabbitmq-priority-queue sta lavorando nel mio ambiente, presumo che il problema è intorno al contesto di primavera. Quindi, dopo un tempo infinito di tentativi ed errori con diversi tipi di configurazioni, e punto il punto esatto della combinazione che farà funzionare! ed è secondo il seguente:
<rabbit:queue name="ad_google_dfa_reporting_queue">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
Senza specificamente definire il valore è di tipo intero, la coda di priorità non funziona. Alla fine, è risolto. Sìì!
Per la p.s. Suggerisco di aggiungere un tag Apache Camel – mjn
@mjn fatto. apache-camel aggiunto. – Reusable
@ ben75 grazie !! mi chiedevo come mettere in ordine i fatti :) – Reusable