2015-12-13 24 views
8

Di recente ho iniziato a sperimentare con RxJava e sono imbattuto in una presentazione da parte di un ingegnere di Netflix che ha suggerito lo spostamento nostre API di business per le API osservabile, per esempio:Qual è il modo corretto di gestire la transazione nei servizi RxJava?

public interface VideoService { 
    Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic); 
    Observable<VideoBasicInfo> getVideoBasicInfo(Integer videoId); 
    Observable<VideoRating> getVideoRating(Integer videoId); 
} 

Tuttavia non ho trovato alcun luogo che ha spiegato come transactionality dovrebbe essere gestito in questi servizi. In un primo momento ho appena annotato la mia implementazione del servizio con @Transactional

@Service 
@Transactional 
public class VideoServiceImpl implements VideoService{ 

    @Autowired 
    private VideoBasicInfoRepository basicInfoRepo; 
    @Autowired 
    private VideoRatingRepository ratingRepo; 

    public Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic){ 
     return Observable.create(s -> { 
      s.onNext(basicInfoRepo.save(videBasic)); 
     }); 
    } 

Quello che vorremmo è che l'esecuzione di tutto il codice all'interno del Object.create lambda (s -> { // This code }) è accaduto in una transazione. Tuttavia , ciò che accade è che:

  1. La chiamata a createVideoBasicInfo() esegue in modo transazionale, restituendo l'osservabile freddo.
  2. Il save() viene eseguito come una transazione atomica.

Ovviamente ha senso poiché il proxy Spring si applica ai metodi serviceImpl. Ho pensato di modi per fare quello che mi aspetto come l'avvio di una transazione programmatico:

return Observable.create(s -> { 
    VideoBasicInfo savedBasic = transactionTemplate.execute(status -> { 
     VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo); 
     return basicInfo; 
    }); 
    s.onNext(savedBasic); 
}); 

È questo il modo consigliato di gestione delle operazioni quando si lavora con le API reattivi?

+0

Qual è la vostra repository? È un'implementazione di Spring Data CrudRepository? Il CrudRepository ha un metodo "salva" contrassegnato come transazionale –

+0

Senza contare che una volta che si utilizza Observables, un utente api può facilmente iscriversi o osservare su thread diversi, e le transazioni Sping e molti altri componenti si basano su un comportamento per thread, usando variabili locali del thread. Usare RxJava con le Transazioni di primavera complesse potrebbe rivelarsi complicato, ma ora che alla Spring 5 viene dato il supporto per Spring Reactor, vorrei imparare come dovrebbe funzionare anche questo. –

risposta

7

primavera dati JpaRepository metodo firme sono già segnate @Transactional, quindi se si utilizza un solo allora non c'è bisogno di fare nulla di speciale:

public interface PersonRepository extends JpaRepository<Person, Integer> { 
} 

@RunWith(SpringJUnit4ClassRunner.class) 
@SpringApplicationConfiguration(classes = {RepositoryConfiguration.class}) 
public class PersonRepositoryTest { 
    private PersonRepository personRepository; 

    @Autowired 
    public void setPersonRepository(PersonRepository PersonRepository) { 
     this.personRepository = PersonRepository; 
    } 

    @Test 
    public void testReactiveSavePerson() { 
     Person person = new Person("Jane", "Doe"); 
     assertNull(person.getId()); //null before save 

     //save person 
     Observable.create(s -> { 
      s.onNext(personRepository.save(person)); 
     }).subscribe(); 

     //fetch from DB 
     Person fetchedPerson = personRepository.findOne(person.getId()); 

     //should not be null 
     assertNotNull(fetchedPerson); 

     //should equal 
     assertEquals(person.getId(), fetchedPerson.getId()); 
     assertEquals(person.getFirstName(), fetchedPerson.getFirstName()); 
    } 
} 

Se è necessario combinare più repository in una sola operazione, si potrebbe usare qualcosa come la classe di seguito:

@Component() 
public class ObservableTxFactory { 
    public final <T> Observable<T> create(Observable.OnSubscribe<T> f) { 
     return new ObservableTx<>(this, f); 
    } 

    @Transactional 
    public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) { 
     onSubscribe.call(subscriber); 
    } 

    private static class ObservableTx<T> extends Observable<T> { 

     public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe<T> f) { 
      super(new OnSubscribeDecorator<>(observableTxFactory, f)); 
     } 
    } 

    private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> { 

     private final ObservableTxFactory observableTxFactory; 
     private final Observable.OnSubscribe<T> onSubscribe; 

     OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe<T> s) { 
      this.onSubscribe = s; 
      this.observableTxFactory = observableTxFactory; 
     } 

     @Override 
     public void call(Subscriber<? super T> subscriber) { 
      observableTxFactory.call(onSubscribe, subscriber); 
     } 
    } 
} 

il fagioli fabbrica deve essere definito così:

@Bean 
ObservableTxFactory observableTxFactory() { 
    return new ObservableTxFactory(); 
} 

Servizio:

@Service 
public class PersonService { 
    @Autowired 
    PersonRepository personRepository; 
    @Autowired 
    ObservableTxFactory observableTxFactory; 

    public Observable<Person> createPerson(String firstName, String lastName) { 
     return observableTxFactory.create(s -> { 
      Person p = new Person(firstName, lastName); 
      s.onNext(personRepository.save(p)); 
     }); 
    } 
} 

prova:

@RunWith(SpringJUnit4ClassRunner.class) 
@SpringApplicationConfiguration(classes = {RepositoryConfiguration.class}) 
public class PersonServiceTest { 
    @Autowired 
    PersonRepository personRepository; 
    @Autowired 
    ObservableTxFactory observableTxFactory; 

    @Test 
    public void testPersonService() { 
     final PersonService service = new PersonService(); 
     service.personRepository = personRepository; 
     service.observableTxFactory = observableTxFactory; 

     final Observable<Person> personObservable = service.createPerson("John", "Doe"); 
     personObservable.subscribe(); 

     //fetch from DB 
     final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false) 
       .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe")) 
       .findFirst() 
       .get(); 

     //should not be null 
     assertNotNull(fetchedPerson); 
    } 

} 

Schermata che mostra delega: enter image description here

+1

Ciao John, sono Spring Data JpaRepositories. Tuttavia, quello che voglio ottenere è la transazione nel livello di servizio, vale a dire chiamare più DAO all'interno della stessa transazione. Può essere fatto con transactionTemplate ma mi chiedevo se c'è un'alternativa – codependent

+0

JpaRepository è una sottoclasse di CrudRepository; in questo esempio il suo effetto è lo stesso. Quindi vuoi che diversi repository siano coinvolti nella stessa transazione? Questo non è il problema indicato nella tua domanda ... Hai scritto: "Quello che vorremmo è che l'esecuzione di BasicInfoRepo.save() avvenga in una transazione." –

+0

Ho aggiornato il codice per mostrare l'implementazione del servizio e il test –

0

Mi piacerebbe avere l'eccellente risposta John Scattergood.Il mio utilizzo tipico è con il Observable.fromCallable() quindi ero alla ricerca di un modo per farlo, invece di attuare il Observable.OnSubscribe così ho adattato la sua tecnica in modo che si può utilizzare passando un

Classe Callable fabbrica:

@Component 
public class ObservableTxFactory { 
    public final <T> Observable.OnSubscribe<T> createFromCallable(Callable<? extends T> resultFactory) { 
     return new OnSubscribeDecorator<>(this, resultFactory); 
    } 

    @SuppressWarnings("unchecked") 
    @Transactional 
    public <T> void call(Callable<? extends T> resultFactory, Subscriber subscriber) { 
     final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<>(subscriber); 

     subscriber.setProducer(singleDelayedProducer); 

     try { 
      singleDelayedProducer.setValue(resultFactory.call()); 
     } catch (Throwable t) { 
      Exceptions.throwOrReport(t, subscriber); 
     } 
    } 

    private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> { 

     private final ObservableTxFactory observableTxFactory; 
     private final Callable<? extends T> resultFactory; 

     OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, Callable<? extends T> resultFactory) { 
      this.resultFactory = resultFactory; 
      this.observableTxFactory = observableTxFactory; 
     } 

     @Override 
     public void call(Subscriber<? super T> subscriber) { 
      observableTxFactory.call(resultFactory, subscriber); 
     } 
    } 
} 

Codice originale:

Observable.fromCallable(() -> fooRepository.findOne(fooID)); 

Nuovo Codice:

Observable.create(observableTxFactory.createFromCallable(() -> fooRepository.findOne(fooID))); 

Assicurarsi che il metodo si aggiunge @Transactional su è public altrimenti Spring AOP non sarà in grado di consigliare è