2013-01-23 6 views
9

MODIFICA: Ha riformulato la domanda:ActiveMQ e broker incorporato

Desidero utilizzare ActiveMQ come servizio di messaggistica tra il mio server e le applicazioni client.

Sto tentando di impostare un broker incorporato (vale a dire non un processo separato) all'interno del server per gestire i messaggi prodotti per i miei clienti. Questa coda è persistente.

L'inizializzazione mediatore come segue:

BrokerService broker = new BrokerService(); 
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(); 
adaptor.setDirectory(new File("activemq")); 
broker.setPersistenceAdapter(adaptor); 
broker.setUseJmx(true); 
broker.addConnector("tcp://localhost:61616"); 
broker.start(); 

Dopo armeggiare, ho finito con la parte server dell'essere:

public static class HelloWorldProducer implements Runnable { 
    public void run() { 
     try { 
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need 
      Connection connection = connectionFactory.createConnection(); 
      connection.start(); 
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      Destination destination = session.createQueue("TEST.FOO"); 
      MessageProducer producer = session.createProducer(destination); 
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
      String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); 
      TextMessage message = session.createTextMessage(text); 
      System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName()); 
      producer.send(message); 
      session.close(); 
      connection.close(); 
     } 
     catch (Exception e) { 
      System.out.println("Caught: " + e); 
      e.printStackTrace(); 
     } 
    } 
} 

Il cliente è molto simile e si presenta così:

Il metodo principale inizia semplicemente ognuno di questi in una discussione per iniziare a produrre/ricevere messaggi GES.

... ma io sono in esecuzione in quanto segue con l'inizio di ogni discussione:

2013-01-24 07:54:31,271 INFO [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost) 
2013-01-24 07:54:31,281 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost 
2013-01-24 07:54:31,302 INFO [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state 
2013-01-24 07:54:31,339 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: [] 
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService 
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry 
    java.rmi.server.ExportException: internal error: ObjID already in use 
    at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186) 
    at sun.rmi.transport.Transport.exportObject(Transport.java:92) 
    at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247) 
    at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411) 
    at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147) 
     <snip....> 

Sembra che i messaggi vengono prodotti e consumati con successo (le altre questioni che ho precedentemente postato su è stato risolto), ma l'eccezione di cui sopra mi preoccupa.

EDIT: Durante l'arresto broker, ora sto salutato anche dal seguente:

2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception: 
    java.io.EOFException 
    at java.io.DataInputStream.readInt(DataInputStream.java:392) 
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269) 
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210) 
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202) 
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) 
    at java.lang.Thread.run(Thread.java:722) 
+0

Dovresti includere tutto il codice di creazione del broker in modo che possiamo vedere tutto ciò che hai configurato. Puoi disabilitare JMX se non ti serve tramite broker.setUseJmx (false); –

+0

Per chiarire, il messaggio visualizzato è di livello DEBUG. Questo non è necessariamente un ERRORE. Potrebbe essere solo informativo. Stai riscontrando un errore nella produzione/consumo di messaggi? La domanda non è chiara su questo punto. – cmonkey

+0

Ho riformulato completamente la domanda. Essenzialmente sto chiedendo 3 sotto-domande. (1) L'eccezione, (2) Messaggi persi e (3) Persistenza. Grazie per aver guardato la mia domanda. –

risposta

11

è possibile incorporare un broker nel codice in un certo numero di modi, gran parte del quale è documentata here. Potresti voler provare ad aggiornare la tua versione dal momento che ciò che stai utilizzando sembra essere piuttosto vecchio in quanto è di default per l'AMQ Store ormai obsoleto invece del più recente store KahaDB. Potresti avere problemi a causa di una corsa tra i thread dei client mentre utilizzano le diverse factory di connessione che potrebbero correre per creare nei broker VM. Se imposti l'opzione create = false sul produttore e assicurati che il thread del consumatore inizi dopo che potrebbe risolvere il problema, o potresti creare il broker VM prima del tempo e aggiungere create = false a entrambi i thread e questo potrebbe fare il trucco.

BrokerService broker = new BrokerService(); 
// configure the broker 
broker.setBrokerName("localhost"); 
broker.setUseJmx(false); 
broker.start(); 

E quindi nel codice client allegare tramite questa configurazione di fabbrica di connessione.

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false"); 
+0

Grazie, ce l'ho. Tuttavia, sono ancora preoccupato per l'eccezione sopra. Sai se può essere tranquillamente ignorato? –

+1

L'eccezione sembra dirti che la connessione client non è riuscita perché hai arrestato il broker mentre il client era ancora connesso. Non è un grosso problema se stai chiudendo l'app. Sembra che il tuo client stia utilizzando TCP invece di VM nella sua factory di connessione, anche se non è del tutto chiaro dalla domanda. –

+0

Sì ... questo è davvero il problema e l'eccezione può essere tranquillamente ignorata. Per evitarlo, i client devono essere arrestati prima del server (che include il broker). –

4

Quando eseguo il codice, ho avuto l'eccezione di seguito:

javax.jms.JMSException: Could not connect to broker URL: tcp://localhost. 
Reason java.lang.IllegalArgumentException: port out of range:-1 

vostro broker è in esecuzione e in ascolto sulla porta 61616, quindi qualsiasi client che tenta di connettersi al broker di bisogno di avere il porto nel suo URL.

Il codice client tenta di connettersi a localhost ma non menziona la porta a cui deve connettersi. Sia il codice produttore che il codice cliente devono essere corretti.

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); 

Per

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 

Dopo aver sistemato la porta, sono stato in grado di eseguire il codice.

+0

Grazie Satish. Sono riuscito a farlo funzionare. Mi scuso, avrei dovuto chiudere la domanda. –