2016-06-06 24 views
6

sto imparando circa operatore RxJava, e ho trovato questi codice qui sotto non è stato stampato nulla:Come utilizzare RxJava Interval Operatore

public static void main(String[] args) { 

    Observable 
    .interval(1, TimeUnit.SECONDS) 
    .subscribe(new Subscriber<Long>() { 
     @Override 
     public void onCompleted() { 
      System.out.println("onCompleted"); 
     } 

     @Override 
     public void onError(Throwable e) { 
      System.out.println("onError -> " + e.getMessage()); 
     } 

     @Override 
     public void onNext(Long l) { 
      System.out.println("onNext -> " + l); 
     } 
    }); 
} 

Come ReactiveX, interval

creare un osservabile che emette una sequenza di interi intervallati da un particolare intervallo di tempo

Ho fatto un errore o ho dimenticato qualcosa?

risposta

7

Bisogna bloccare fino a quando il osservabile viene consumato:

public static void main(String[] args) throws Exception { 

    CountDownLatch latch = new CountDownLatch(1); 

    Observable 
    .interval(1, TimeUnit.SECONDS) 
    .subscribe(new Subscriber<Long>() { 
     @Override 
     public void onCompleted() { 
      System.out.println("onCompleted"); 
      // make sure to complete only when observable is done 
      latch.countDown(); 
     } 

     @Override 
     public void onError(Throwable e) { 
      System.out.println("onError -> " + e.getMessage()); 
     } 

     @Override 
     public void onNext(Long l) { 
      System.out.println("onNext -> " + l); 
     } 
    }); 

    // wait for observable to complete (never in this case...) 
    latch.await(); 
} 

È possibile aggiungere .take(10) per esempio per vedere la completa osservabile.

+0

Bello, un chiaro esempio di CountdownLatch da avviare. – mtyson

+0

in Rxjava 2, esiste un modo per utilizzare l'intervallo con singolo? –

2

Metti il ​​Thread.sleep(1000000) dopo l'iscrizione e vedrai che funziona. Observable.interval opera per impostazione predefinita su Schedulers.computation() in modo che lo streaming venga eseguito su un thread diverso dal thread principale.

+0

Sì, è corretto. Operatore di intervallo che funziona in modo asincrono, quindi ho bisogno di bloccare e attendere per ottenere risultati da esso. Grazie! –

0

Come ti dicono già che l'intervallo funziona in modo asincrono, quindi devi aspettare che tutti gli eventi finiscano.

È possibile ottenere l'abbonamento una volta sottoscritto e quindi utilizzare TestSubcriber, che fa parte della piattaforma reactiveX, e che consente di attendere che tutti gli eventi vengano interrotti.

 @Test 
public void testObservableInterval() throws InterruptedException { 
    Subscription subscription = Observable.interval(1, TimeUnit.SECONDS) 
       .map(time-> "item emitted") 
       .subscribe(System.out::print, 
         item -> System.out.print("final:" + item)); 
    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
} 

ho nel mio github altri esempi, se avete bisogno https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java