Hai alcune opzioni come hai detto.
Se lo si converte in un argomento per ottenere lo stesso effetto è necessario rendere i consumatori consumatori persistenti. Una cosa che offre la coda è la persistenza se il tuo consumatore non è vivo. Questo dipenderà dal sistema MQ che stai utilizzando.
Se si desidera mantenere le code, verrà creata una coda per ciascun utente e un dispatcher che ascolterà sulla coda originale.
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
Pro di Discussioni
- più facile aggiungere dinamicamente nuovi consumatori. Tutti i consumatori riceveranno nuovi messaggi senza alcun lavoro.
- È possibile creare argomenti round robin, in modo che Consumer_1 riceva un messaggio, quindi Consumer_2, quindi Consumer_3
- I consumatori possono essere inviati nuovi messaggi, invece di dover eseguire query su una coda rendendoli reattivi.
Contro di Discussioni
- I messaggi non sono persistenti a meno che il Broker supporta questa configurazione. Se un utente si disconnette e ritorna, è possibile che i messaggi persi siano a meno che non siano impostati i consumatori permanenti.
- Difficile consentire a Consumer_1 e Consumer_2 di ricevere un messaggio ma non Consumer_3.Con Dispatcher e Code, il Dispatcher non può inserire un messaggio nella coda di Consumer_3.
A favore di code
- messaggi sono permanenti fino a quando un consumatore rimuove li
- Un dispatcher in grado di filtrare i quali i consumatori ottengono quali messaggi da non trasmettere i messaggi nel rispettivi consumatori code. Questo però può essere fatto con argomenti attraverso i filtri.
Contro di code
- code supplementari devono essere creato per supportare più i consumatori. In un ambiente dinamico questo non sarebbe efficiente.
Quando si sviluppa un sistema di messaggistica preferisco argomenti quanto mi dà più potere, ma visto che si sta già utilizzando le code che richiederebbe di modificare come funziona il sistema per implementare argomenti invece.
Progettazione e Realizzazione di sistema di coda con più consumatori
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
Fonte
Tenete a mente ci sono altre cose di cui ha bisogno di prendersi cura di come la manipolazione problema eccezioni, ricollegamento alla connessione e code in caso di perdita della connessione, ecc. Questo è appena progettato per darti un'idea di come realizzare ciò che ho descritto.
In un sistema reale probabilmente non uscirò alla prima eccezione. Vorrei consentire al sistema di continuare a funzionare al meglio e registrare gli errori. Così com'è in questo codice se l'invio di un messaggio in una singola coda di consumatori non riesce, l'intero committente si fermerà.
Dispatcher.java
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package stackoverflow_4615895;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
public class Dispatcher {
private static long QUEUE_WAIT_TIME = 1000;
private boolean mStop = false;
private QueueConnectionFactory mFactory;
private String mSourceQueueName;
private String[] mConsumerQueueNames;
/**
* Create a dispatcher
* @param factory
* The QueueConnectionFactory in which new connections, session, and consumers
* will be created. This is needed to ensure the connection is associated
* with the correct thread.
* @param source
*
* @param consumerQueues
*/
public Dispatcher(
QueueConnectionFactory factory,
String sourceQueue,
String[] consumerQueues) {
mFactory = factory;
mSourceQueueName = sourceQueue;
mConsumerQueueNames = consumerQueues;
}
public void start() {
Thread thread = new Thread(new Runnable() {
public void run() {
Dispatcher.this.run();
}
});
thread.setName("Queue Dispatcher");
thread.start();
}
public void stop() {
mStop = true;
}
private void run() {
QueueConnection connection = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
QueueSession session = null;
try {
// Setup connection and queues for receiving the messages
connection = mFactory.createQueueConnection();
session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue sourceQueue = session.createQueue(mSourceQueueName);
consumer = session.createConsumer(sourceQueue);
// Create a null producer allowing us to send messages
// to any queue.
producer = session.createProducer(null);
// Create the destination queues based on the consumer names we
// were given.
Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
for (int index = 0; index < mConsumerQueueNames.length; ++index) {
destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
}
connection.start();
while (!mStop) {
// Only wait QUEUE_WAIT_TIME in order to give
// the dispatcher a chance to see if it should
// quit
Message m = consumer.receive(QUEUE_WAIT_TIME);
if (m == null) {
continue;
}
// Take the message we received and put
// it in each of the consumers destination
// queues for them to process
for (Queue q : destinationQueues) {
producer.send(q, m);
}
}
} catch (JMSException ex) {
// Do wonderful things here
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException ex) {
}
}
if (consumer != null) {
try {
consumer.close();
} catch (JMSException ex) {
}
}
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
}
}
Main.java
QueueConnectionFactory factory = ...;
Dispatcher dispatcher =
new Dispatcher(
factory,
"Queue_Original",
new String[]{
"Consumer_Queue_1",
"Consumer_Queue_2",
"Consumer_Queue_3"});
dispatcher.start();
+1 buona risposta. – skaffman
Questa è stata un'ottima risposta. Sto usando l'implementazione MOM di JBoss che è HornetQ. –
@Anonimo L'ultima volta che ho controllato JBoss aderisce assolutamente alle specifiche JMS. Ciò mi ha causato qualche problema in passato perché creo argomenti in modo dinamico a cui le specifiche JMS non tengono conto. Altri come ActiveMQ ti consentono di creare argomenti in modo dinamico e richiede solo 1 riga di modifica del codice in JBoss per consentire la stessa funzionalità. –