Poiché onMessage()
non consente di generare eccezioni controllate, è possibile avvolgere l'eccezione in un RuntimeException
e riavviarlo.
try {
testService.process(message);
} catch (BusinessException e) {
throw new RuntimeException(e);
}
noti comunque che questo può causare il messaggio di essere ri-consegnato a tempo indeterminato. Ecco come funziona:
RabbitMQ supporta il rifiuto di un messaggio e chiede al broker di riaccenderlo. Questo è mostrato here. Ma RabbitMQ non ha un meccanismo nativo per riprovare i criteri, ad es. impostazione di tentativi massimi, ritardo, ecc.
Quando si utilizza Spring AMQP, "requeue on reject" è l'opzione predefinita. Spring's SimpleMessageListenerContainer
eseguirà questa operazione quando c'è un'eccezione non gestita. Quindi nel tuo caso hai solo bisogno di rilanciare l'eccezione catturata. Si noti tuttavia che se non è possibile elaborare un messaggio e si lancia sempre l'eccezione, questo verrà restituito a tempo indeterminato e genererà un ciclo infinito.
È possibile sovrascrivere questo comportamento per messaggio generando un'eccezione AmqpRejectAndDontRequeueException
, nel qual caso il messaggio non verrà riacceso.
È inoltre possibile disattivare la "riaccodamento sulla rifiutare" il comportamento di SimpleMessageListenerContainer
interamente impostando
container.setDefaultRequeueRejected(false)
Quando un messaggio viene rifiutato e non riaccodato sarà o essere perso o trasferito ad un DLQ, se uno è impostato su RabbitMQ.
Se avete bisogno di una politica di tentativi con numero massimo di tentativi, delay, ecc il modo più semplice è quello di impostare una molla "stateless" RetryOperationsInterceptor
che farà tutti i tentativi all'interno del filo (utilizzando Thread.sleep()
) senza rifiutare il messaggio su ogni tentativo (quindi senza tornare a RabbitMQ per ogni nuovo tentativo). Quando i tentativi sono esauriti, per impostazione predefinita verrà registrato un avviso e il messaggio verrà consumato. Se si desidera inviare a un DLQ è necessario un RepublishMessageRecoverer
o un MessageRecoverer
personalizzato che rifiuta il messaggio senza riaccodamento (in quest'ultimo caso si dovrebbe anche setup un RabQQ RabbitMQ in coda). Esempio con messaggio predefinito recuperatore:
container.setAdviceChain(new Advice[] {
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2, 5000)
.build()
});
Questo ovviamente ha lo svantaggio che si occuperà la discussione per tutta la durata dei tentativi. Hai anche la possibilità di usare uno "stateful" RetryOperationsInterceptor
, che invierà il messaggio a RabbitMQ per ogni tentativo, ma il ritardo sarà comunque implementato con Thread.sleep()
all'interno dell'applicazione, inoltre l'impostazione di un intercettore stateful è un po 'più complicata.
Pertanto, se si desidera eseguire tentativi con ritardi senza occupare uno Thread
, sarà necessaria una soluzione personalizzata molto più coinvolta che utilizza TTL nelle code RabbitMQ. Se non vuoi il backoff esponenziale (quindi il ritardo non aumenta ad ogni nuovo tentativo) è un po 'più semplice. Per implementare tale soluzione, in pratica crei un'altra coda su rabbitMQ con argomenti: "x-message-ttl": <delay time in milliseconds>
e "x-dead-letter-exchange":"<name of the original queue>"
.Quindi nella coda principale si imposta "x-dead-letter-exchange":"<name of the queue with the TTL>"
. Così ora, quando rifiuti e non riattivi un messaggio, RabbitMQ lo reindirizzerà alla seconda coda. Quando TTL scade, verrà reindirizzato alla coda originale e quindi riconsegnato all'applicazione. Quindi ora hai bisogno di un nuovo intercettore che rifiuti il messaggio a RabbitMQ dopo ogni errore e tenga traccia del conteggio dei tentativi. Per evitare la necessità di mantenere lo stato nell'applicazione (poiché se l'applicazione è in cluster è necessario replicare lo stato) è possibile calcolare il conteggio tentativi dall'intestazione x-death
impostata da RabbitMQ. Vedi maggiori informazioni su questa intestazione here. Quindi a quel punto l'implementazione di un intercettore personalizzato è più semplice della personalizzazione dell'intercettore di stato Spring con questo comportamento.
Controllare anche the section about retries in the Spring AMQP reference.
Ho modificato la mia risposta per coprire la tua domanda specifica. Fammi sapere se hai bisogno di ulteriori chiarimenti. –