2011-01-06 9 views
20

Ho un client JMS che sta producendo messaggi e inviando una coda JMS al suo unico consumatore.JMS - Passaggio da uno a più utenti

Quello che voglio è che più di un consumatore ottenga quei messaggi. La prima cosa che mi viene in mente è convertire la coda in un argomento, in modo che i consumatori attuali e nuovi possano iscriversi e ricevere lo stesso messaggio a tutti loro.

Ciò comporterà ovviamente la modifica del codice client corrente sia dal lato produttore che da quello consumer.

Vorrei anche esaminare altre opzioni come creare una seconda coda, in modo da non dover modificare il consumatore esistente. Credo che ci siano vantaggi in questo approccio come (correggimi se sbaglio) bilanciare il carico tra due code diverse anziché una, il che potrebbe avere un impatto positivo sulle prestazioni.

Vorrei ricevere consigli su queste opzioni e contro/pro che potresti vedere. Qualsiasi feedback è molto apprezzato.

risposta

45

Hai alcune opzioni come hai detto.

Se lo si converte in un argomento per ottenere lo stesso effetto è necessario rendere i consumatori consumatori persistenti. Una cosa che offre la coda è la persistenza se il tuo consumatore non è vivo. Questo dipenderà dal sistema MQ che stai utilizzando.

Se si desidera mantenere le code, verrà creata una coda per ciascun utente e un dispatcher che ascolterà sulla coda originale.

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 
             -> Queue_Consumer_2 <- Consumer_2 
             -> Queue_Consumer_3 <- Consumer_3 

Pro di Discussioni

  • più facile aggiungere dinamicamente nuovi consumatori. Tutti i consumatori riceveranno nuovi messaggi senza alcun lavoro.
  • È possibile creare argomenti round robin, in modo che Consumer_1 riceva un messaggio, quindi Consumer_2, quindi Consumer_3
  • I consumatori possono essere inviati nuovi messaggi, invece di dover eseguire query su una coda rendendoli reattivi.

Contro di Discussioni

  • I messaggi non sono persistenti a meno che il Broker supporta questa configurazione. Se un utente si disconnette e ritorna, è possibile che i messaggi persi siano a meno che non siano impostati i consumatori permanenti.
  • Difficile consentire a Consumer_1 e Consumer_2 di ricevere un messaggio ma non Consumer_3.Con Dispatcher e Code, il Dispatcher non può inserire un messaggio nella coda di Consumer_3.

A favore di code

  • messaggi sono permanenti fino a quando un consumatore rimuove li
  • Un dispatcher in grado di filtrare i quali i consumatori ottengono quali messaggi da non trasmettere i messaggi nel rispettivi consumatori code. Questo però può essere fatto con argomenti attraverso i filtri.

Contro di code

  • code supplementari devono essere creato per supportare più i consumatori. In un ambiente dinamico questo non sarebbe efficiente.

Quando si sviluppa un sistema di messaggistica preferisco argomenti quanto mi dà più potere, ma visto che si sta già utilizzando le code che richiederebbe di modificare come funziona il sistema per implementare argomenti invece.

Progettazione e Realizzazione di sistema di coda con più consumatori

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 
             -> Queue_Consumer_2 <- Consumer_2 
             -> Queue_Consumer_3 <- Consumer_3 

Fonte

Tenete a mente ci sono altre cose di cui ha bisogno di prendersi cura di come la manipolazione problema eccezioni, ricollegamento alla connessione e code in caso di perdita della connessione, ecc. Questo è appena progettato per darti un'idea di come realizzare ciò che ho descritto.

In un sistema reale probabilmente non uscirò alla prima eccezione. Vorrei consentire al sistema di continuare a funzionare al meglio e registrare gli errori. Così com'è in questo codice se l'invio di un messaggio in una singola coda di consumatori non riesce, l'intero committente si fermerà.

Dispatcher.java

/* 
* To change this template, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package stackoverflow_4615895; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer; 
import javax.jms.Queue; 
import javax.jms.QueueConnection; 
import javax.jms.QueueConnectionFactory; 
import javax.jms.QueueSession; 
import javax.jms.Session; 

public class Dispatcher { 

    private static long QUEUE_WAIT_TIME = 1000; 
    private boolean mStop = false; 
    private QueueConnectionFactory mFactory; 
    private String mSourceQueueName; 
    private String[] mConsumerQueueNames; 

    /** 
    * Create a dispatcher 
    * @param factory 
    *  The QueueConnectionFactory in which new connections, session, and consumers 
    *  will be created. This is needed to ensure the connection is associated 
    *  with the correct thread. 
    * @param source 
    * 
    * @param consumerQueues 
    */ 
    public Dispatcher(
     QueueConnectionFactory factory, 
     String sourceQueue, 
     String[] consumerQueues) { 

     mFactory = factory; 
     mSourceQueueName = sourceQueue; 
     mConsumerQueueNames = consumerQueues; 
    } 

    public void start() { 
     Thread thread = new Thread(new Runnable() { 

      public void run() { 
       Dispatcher.this.run(); 
      } 
     }); 
     thread.setName("Queue Dispatcher"); 
     thread.start(); 
    } 

    public void stop() { 
     mStop = true; 
    } 

    private void run() { 

     QueueConnection connection = null; 
     MessageProducer producer = null; 
     MessageConsumer consumer = null; 
     QueueSession session = null; 
     try { 
      // Setup connection and queues for receiving the messages 
      connection = mFactory.createQueueConnection(); 
      session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
      Queue sourceQueue = session.createQueue(mSourceQueueName); 
      consumer = session.createConsumer(sourceQueue); 

      // Create a null producer allowing us to send messages 
      // to any queue. 
      producer = session.createProducer(null); 

      // Create the destination queues based on the consumer names we 
      // were given. 
      Queue[] destinationQueues = new Queue[mConsumerQueueNames.length]; 
      for (int index = 0; index < mConsumerQueueNames.length; ++index) { 
       destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]); 
      } 

      connection.start(); 

      while (!mStop) { 

       // Only wait QUEUE_WAIT_TIME in order to give 
       // the dispatcher a chance to see if it should 
       // quit 
       Message m = consumer.receive(QUEUE_WAIT_TIME); 
       if (m == null) { 
        continue; 
       } 

       // Take the message we received and put 
       // it in each of the consumers destination 
       // queues for them to process 
       for (Queue q : destinationQueues) { 
        producer.send(q, m); 
       } 
      } 

     } catch (JMSException ex) { 
      // Do wonderful things here 
     } finally { 
      if (producer != null) { 
       try { 
        producer.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (consumer != null) { 
       try { 
        consumer.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (session != null) { 
       try { 
        session.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (connection != null) { 
       try { 
        connection.close(); 
       } catch (JMSException ex) { 
       } 
      } 
     } 
    } 
} 

Main.java

QueueConnectionFactory factory = ...; 

    Dispatcher dispatcher = 
      new Dispatcher(
      factory, 
      "Queue_Original", 
      new String[]{ 
       "Consumer_Queue_1", 
       "Consumer_Queue_2", 
       "Consumer_Queue_3"}); 
    dispatcher.start(); 
+0

+1 buona risposta. – skaffman

+0

Questa è stata un'ottima risposta. Sto usando l'implementazione MOM di JBoss che è HornetQ. –

+0

@Anonimo L'ultima volta che ho controllato JBoss aderisce assolutamente alle specifiche JMS. Ciò mi ha causato qualche problema in passato perché creo argomenti in modo dinamico a cui le specifiche JMS non tengono conto. Altri come ActiveMQ ti consentono di creare argomenti in modo dinamico e richiede solo 1 riga di modifica del codice in JBoss per consentire la stessa funzionalità. –

4

Potrebbe non essere necessario modificare il codice; dipende da come l'hai scritto.

Ad esempio, se il codice invia messaggi utilizzando MessageProducer anziché QueueSender, funzionerà per argomenti e code. Allo stesso modo se hai utilizzato MessageConsumer anziché QueueReceiver.

Essenzialmente, è buona norma nelle applicazioni JMS utilizzare interfacce non specifici per interagire con il sistema JMS, come MessageProducer, MessageConsumer, Destination, ecc Se questo è il caso, è un "semplice" questione di configurazione.

+0

che sarebbe una buona opzione. Sfortunatamente stiamo usando interfacce specifiche come QueueSender. Questo è sicuramente qualcosa che terrò a mente se refactoring. –