Sto usando RxJava 1.1 a comporre una sequenza osservabile da all'interno di un'applicazione primavera che è simile al seguente:transazione rollback in un'applicazione reattivo
@Transaction
public Observable<Event> create(Event event) {
return Observable.just(event)
.flatMap(event -> {
//save event to db (blocking JPA operation)
Event event = eventRepository.save(event);
return Observable.just(event);
})
//async REST call to service A
.flatMap(this::sendEventToServiceA) <---- may execute on different thread
//async REST call to service B
.flatMap(this::sendEventToServiceB) <---- may execute on different thread
.doOnError(throwable -> {
// ? rollback initally created transaction?
})
}
Un evento raggiunge il livello di servizio della mia applicazione da qualche classe controller e questo si propaga attraverso una catena di operazioni costruite con la funzione flatMap() di RxJava. L'evento viene prima archiviato nel database (Spring Data) e le successive due richieste HTTP asincrone vengono eseguite una dopo l'altra utilizzando la libreria Spring's AsyncRestTemplate dietro le quinte.
In caso di errore/eccezione in qualsiasi punto della pipeline, vorrei poter eseguire il rollback della transazione del database in modo che l'evento NON sia memorizzato nel database. Ho trovato che non è facile da fare poiché in primavera il contesto della transazione è associato al particolare thread di esecuzione. Pertanto, se il codice raggiunge il callback doOnError su un thread diverso (AsyncRestTemplate utilizza il proprio AsyncTaskExecutor), non è possibile eseguire il rollback della transazione inizialmente creata.
Si può consigliare qualsiasi meccanismo per realizzare transazioni attraverso un'applicazione multi-thread composta da diverse operazioni asincrone scritte in questo modo?
Ho anche cercato di creare una transazione a livello di codice con:
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
e poi inviare l'oggetto transactionStatus insieme con l'evento attraverso il gasdotto, ma ancora una volta, quando si verifica un errore e invoco "platformTransactionManager.rollback (stato); ", ottengo" la sincronizzazione delle transazioni non è attiva "poiché questo è in esecuzione su un thread diverso, immagino.
p.s. I metodi sendEventToServiceA/sendEventToServiceB simile al seguente:
public Observable<Event> sendEventToServiceA(event) {
..........
ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
"/serviceA/create?event_id=" + event.id,
HttpMethod.POST, requestEntity, String.class);
return ObservableUtil.toRxObservable(listenableFuture);
}
Grazie Dave! La tua soluzione con lo schedulatore sembra funzionare correttamente. – odybour
Un problema minore che ho riscontrato è che il metodo flatMap che esegue il salvataggio nel database verrà eseguito su un thread diverso rispetto a quello che ha creato la transazione in primo luogo a causa dell'annotazione. Per ovviare a ciò ho creato la transazione a livello di codice all'interno del metodo flatMap appena prima dell'operazione di salvataggio, quindi ho memorizzato la transazione su un oggetto di contesto che ho passato alla pipeline osservabile e all'interno di doOnError faccio qualcosa del tipo: 'transactionManager.rollback (context. getTransaction()); '. – odybour