Ho lavorato con JMS e ActiveMQ. Tutto sta funzionando a meraviglia. Non sto utilizzando la molla, né posso I.Segnalare un rollback da un JMS MessageListener
L'interfaccia javax.jms.MessageListener
ha un solo metodo, onMessage
. Dall'interno di un'implementazione, c'è la possibilità che venga lanciata un'eccezione. Se in effetti viene lanciata un'eccezione, allora dico che il messaggio non è stato elaborato correttamente e deve essere ripetuto. Quindi, ho bisogno di ActiveMQ per aspettare un po 'e poi, riprovare. Ad esempio, ho bisogno dell'eccezione generata per il rollback della transazione JMS.
Come posso eseguire un simile comportamento?
Forse c'è qualche configurazione in ActiveMQ che non ero in grado di trovare.
Oppure ... forse potrebbe farla finita con la registrazione MessageListener
s per i consumatori e consumare i messaggi io stesso, in un un ciclo come:
while (true) {
// ... some administrative stuff like ...
session = connection.createSesstion(true, SESSION_TRANSACTED)
try {
Message m = receiver.receive(queue, 1000L);
theMessageListener.onMessage(m);
session.commit();
} catch (Exception e) {
session.rollback();
Thread.sleep(someTimeDefinedSomewhereElse);
}
// ... some more administrative stuff
}
in un paio di fili, invece di registrare l'ascoltatore.
Oppure ... In qualche modo potrei decorare/AOP/byte-manipolare il MessageListener
s per farlo da soli.
Quale strada prenderesti e perché?
note: Non ho il controllo completo sul codice MessageListener
s.
EDIT Un test per prova di concetto:
@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
brokerService.stop();
assertEquals(3, atomicInteger.get());
}
Grazie mille whaley e @Ammar per le risposte. Sto svalutando entrambi, poiché entrambi mi hanno indirizzato nella giusta direzione. Ma non scegliere ancora una risposta giusta. Perché sono necessari più test. –