2011-08-27 13 views
12

Ho lavorato con JMS e ActiveMQ. Tutto sta funzionando a meraviglia. Non sto utilizzando la molla, né posso I.Segnalare un rollback da un JMS MessageListener

L'interfaccia javax.jms.MessageListener ha un solo metodo, onMessage. Dall'interno di un'implementazione, c'è la possibilità che venga lanciata un'eccezione. Se in effetti viene lanciata un'eccezione, allora dico che il messaggio non è stato elaborato correttamente e deve essere ripetuto. Quindi, ho bisogno di ActiveMQ per aspettare un po 'e poi, riprovare. Ad esempio, ho bisogno dell'eccezione generata per il rollback della transazione JMS.

Come posso eseguire un simile comportamento?

Forse c'è qualche configurazione in ActiveMQ che non ero in grado di trovare.

Oppure ... forse potrebbe farla finita con la registrazione MessageListener s per i consumatori e consumare i messaggi io stesso, in un un ciclo come:

while (true) { 
    // ... some administrative stuff like ... 
    session = connection.createSesstion(true, SESSION_TRANSACTED) 
    try { 
     Message m = receiver.receive(queue, 1000L); 
     theMessageListener.onMessage(m); 
     session.commit(); 
    } catch (Exception e) { 
     session.rollback(); 
     Thread.sleep(someTimeDefinedSomewhereElse); 
    } 
    // ... some more administrative stuff 
} 

in un paio di fili, invece di registrare l'ascoltatore.

Oppure ... In qualche modo potrei decorare/AOP/byte-manipolare il MessageListener s per farlo da soli.

Quale strada prenderesti e perché?

note: Non ho il controllo completo sul codice MessageListener s.

EDIT Un test per prova di concetto:

@Test 
@Ignore("Interactive test, just a proof of concept") 
public void transaccionConListener() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     @Override 
     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         message.acknowledge(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         message.acknowledge(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    brokerService.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 
+0

Grazie mille whaley e @Ammar per le risposte. Sto svalutando entrambi, poiché entrambi mi hanno indirizzato nella giusta direzione. Ma non scegliere ancora una risposta giusta. Perché sono necessari più test. –

risposta

10

Se si desidera utilizzare SESSION_TRANSACTED come la modalità di riconoscimento, allora avete bisogno di impostare una RedeliveryPolicy on your Connection/ConnectionFactory. This page on ActiveMQ's website contiene anche alcune informazioni utili per ciò che potrebbe essere necessario fare.

Dal momento che non si utilizza Primavera, è possibile impostare un RedeliveryPolicy con qualcosa di simile al codice seguente (preso da uno dei link sopra):

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); 
policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

Modifica Prendendo la vostra snippet di codice aggiunto alla risposta, il seguente mostra come funziona con le transazioni. Prova questo codice con il metodo Session.rollback() commentato e lo vedrai usando SESION_TRANSACTED e Session.commit/rollback funziona come previsto:

@Test 
public void test() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         session.commit(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         session.rollback(); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         session.commit(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 

}

+0

Questo non ha funzionato. Ma mi ha indirizzato nella giusta direzione. Lascerò DUPS_OK_ACKNOWLEDGE dal momento che sembra essere quello che funziona che devo lavorare di meno. –

+0

È necessario incollare l'intero codice, perché non si sta facendo qualcosa correttamente con la sessione. DUPS_OK_ACKNOWLEDGE sembra funzionare solo perché il riconoscimento del client è pigro e il broker continuerà a inviare nuovamente i messaggi fino a quando il client non eseguirà l'ack. – whaley

+0

Ho incollato una prova di concetto. Posso farlo funzionare solo con DUPS_OK_ACKNOWLEDGE e il messaggio.allenamento non sembra fare la differenza. –

2

È necessario impostare la modalità di riconoscimento al Session.CLIENT_ACKNOWLEDGE, il cliente riconosce un messaggio consumato chiamando il metodo riconoscere il messaggio.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Poi, dopo l'elaborazione del messaggio ad avere bisogno di chiamare il metodo Message.acknowledge() al fine di rimuovere il messaggio.

Message message = ...; 
// Processing message 

message.acknowledge(); 
+0

Non funziona. _onMessage_ viene ancora chiamato una volta, anche se _message.acknowledge() _ non viene mai chiamato. –

+0

Hai impostato correttamente la modalità di riconoscimento? Deve essere impostato su Session.CLIENT_ACKNOWLEDGE! – Ammar

+0

Ma funziona con (false, Session.DUPS_OK_ACKNOWLEDGE) ... message.acknowledge() non sembra fare il trucco. –

0

Se la sessione viene negoziato, poi "acknowledgeMode" viene ignorato anyways..So, basta lasciare la sessione di transato e utilizzare session.rollback e session.commit per eseguire il commit o il rollback della transazione.

+1

Penso che il (mio) problema sia che la sessione non è accessibile all'interno di MessageListener.onMessage (Message). –