2016-05-02 38 views
9

Ho un listener di messaggi AMQP di primavera in esecuzione.Come chiedere a RabbitMQ di riprovare quando l'attività commerciale si verifica in Spring Asynchronous MessageListener use case

public class ConsumerService implements MessageListener { 

    @Autowired 
    RabbitTemplate rabbitTemplate; 

    @Override 
    public void onMessage(Message message) { 
     try { 
      testService.process(message); //This process method can throw Business Exception 
     } catch (BusinessException e) { 
      //Here we can just log the exception. How the retry attempt is made? 
     } catch (Exception e) { 
      //Here we can just log the exception. How the retry attempt is made? 
     } 
    } 
} 

Come si può vedere, potrebbe verificarsi un'eccezione durante il processo. Voglio riprovare a causa di un particolare errore nel blocco Catch. Non posso attraverso l'eccezione in onMessage. Come dire a RabbitMQ che c'è un'eccezione e riprovare?

+0

Ho modificato la mia risposta per coprire la tua domanda specifica. Fammi sapere se hai bisogno di ulteriori chiarimenti. –

risposta

10

Poiché onMessage() non consente di generare eccezioni controllate, è possibile avvolgere l'eccezione in un RuntimeException e riavviarlo.

try { 
    testService.process(message); 
} catch (BusinessException e) { 
    throw new RuntimeException(e); 
} 

noti comunque che questo può causare il messaggio di essere ri-consegnato a tempo indeterminato. Ecco come funziona:

RabbitMQ supporta il rifiuto di un messaggio e chiede al broker di riaccenderlo. Questo è mostrato here. Ma RabbitMQ non ha un meccanismo nativo per riprovare i criteri, ad es. impostazione di tentativi massimi, ritardo, ecc.

Quando si utilizza Spring AMQP, "requeue on reject" è l'opzione predefinita. Spring's SimpleMessageListenerContainer eseguirà questa operazione quando c'è un'eccezione non gestita. Quindi nel tuo caso hai solo bisogno di rilanciare l'eccezione catturata. Si noti tuttavia che se non è possibile elaborare un messaggio e si lancia sempre l'eccezione, questo verrà restituito a tempo indeterminato e genererà un ciclo infinito.

È possibile sovrascrivere questo comportamento per messaggio generando un'eccezione AmqpRejectAndDontRequeueException, nel qual caso il messaggio non verrà riacceso.

È inoltre possibile disattivare la "riaccodamento sulla rifiutare" il comportamento di SimpleMessageListenerContainer interamente impostando

container.setDefaultRequeueRejected(false) 

Quando un messaggio viene rifiutato e non riaccodato sarà o essere perso o trasferito ad un DLQ, se uno è impostato su RabbitMQ.

Se avete bisogno di una politica di tentativi con numero massimo di tentativi, delay, ecc il modo più semplice è quello di impostare una molla "stateless" RetryOperationsInterceptor che farà tutti i tentativi all'interno del filo (utilizzando Thread.sleep()) senza rifiutare il messaggio su ogni tentativo (quindi senza tornare a RabbitMQ per ogni nuovo tentativo). Quando i tentativi sono esauriti, per impostazione predefinita verrà registrato un avviso e il messaggio verrà consumato. Se si desidera inviare a un DLQ è necessario un RepublishMessageRecoverer o un MessageRecoverer personalizzato che rifiuta il messaggio senza riaccodamento (in quest'ultimo caso si dovrebbe anche setup un RabQQ RabbitMQ in coda). Esempio con messaggio predefinito recuperatore:

container.setAdviceChain(new Advice[] { 
     org.springframework.amqp.rabbit.config.RetryInterceptorBuilder 
       .stateless() 
       .maxAttempts(5) 
       .backOffOptions(1000, 2, 5000) 
       .build() 
}); 

Questo ovviamente ha lo svantaggio che si occuperà la discussione per tutta la durata dei tentativi. Hai anche la possibilità di usare uno "stateful" RetryOperationsInterceptor, che invierà il messaggio a RabbitMQ per ogni tentativo, ma il ritardo sarà comunque implementato con Thread.sleep() all'interno dell'applicazione, inoltre l'impostazione di un intercettore stateful è un po 'più complicata.

Pertanto, se si desidera eseguire tentativi con ritardi senza occupare uno Thread, sarà necessaria una soluzione personalizzata molto più coinvolta che utilizza TTL nelle code RabbitMQ. Se non vuoi il backoff esponenziale (quindi il ritardo non aumenta ad ogni nuovo tentativo) è un po 'più semplice. Per implementare tale soluzione, in pratica crei un'altra coda su rabbitMQ con argomenti: "x-message-ttl": <delay time in milliseconds> e "x-dead-letter-exchange":"<name of the original queue>".Quindi nella coda principale si imposta "x-dead-letter-exchange":"<name of the queue with the TTL>". Così ora, quando rifiuti e non riattivi un messaggio, RabbitMQ lo reindirizzerà alla seconda coda. Quando TTL scade, verrà reindirizzato alla coda originale e quindi riconsegnato all'applicazione. Quindi ora hai bisogno di un nuovo intercettore che rifiuti il ​​messaggio a RabbitMQ dopo ogni errore e tenga traccia del conteggio dei tentativi. Per evitare la necessità di mantenere lo stato nell'applicazione (poiché se l'applicazione è in cluster è necessario replicare lo stato) è possibile calcolare il conteggio tentativi dall'intestazione x-death impostata da RabbitMQ. Vedi maggiori informazioni su questa intestazione here. Quindi a quel punto l'implementazione di un intercettore personalizzato è più semplice della personalizzazione dell'intercettore di stato Spring con questo comportamento.

Controllare anche the section about retries in the Spring AMQP reference.

+0

Thans Nazreet per la tua spiegazione molto dettagliata. Ho ancora qualche dubbio. È chiaro che devo lanciare un'eccezione di runtime per andare a riprovare. Utilizzare anche RetryInterceptor che limita i tentativi di nuovo tentativo. 1. In quali casi andremmo con Intercettatore di stato/Interceptor senza stato? 2. Inoltre, con l'opzione Riprova configurata, gli altri messaggi rimangono incustoditi fino al completamento di tutti i tentativi? Dovrebbe essere un thread separato per riprovare rt? 3. Che ne dici di creare una coda separata per riprovare e ripetere l'implementazione personalizzata? – Santosh

+0

Sono contento che sia stato utile. Per rispondere alle tue domande: 1) Gli interceptor di prova stateful/stateless sono generalmente utilizzati in primavera (batch, integrazione, amqp, ecc.) In modo che abbiano vari casi d'uso. Ma nel caso dei consumatori del messaggio AMQP, non credo che lo stato fornisca un beneficio molto grande, come ho detto. La differenza è che stateful invierà il messaggio a RabbitMQ ad ogni nuovo tentativo (ma dopo aver dormito, se il backoff è configurato). –

+0

2) Ciò che accadrà con gli altri messaggi dipende dal numero di utenti concorrenti. Ogni consumatore è un thread separato. Puoi impostarlo con container.setConcurrentConsumers(). 3) L'implementazione di nuovi tentativi personalizzati è abbastanza complessa da spiegare in un commento. Proverò a trovare il tempo per spiegare dopo. –