2015-05-05 13 views
6

Sto provando a scrivere un semplice programma utilizzando RxJava per generare una sequenza infinita di numeri naturali. Quindi, ho trovato due modi per generare una sequenza di numeri usando Observable.timer() e Observable.interval(). Non sono sicuro che queste funzioni siano il modo giusto per affrontare questo problema. Mi aspettavo una semplice funzione come quella che abbiamo in Java 8 per generare numeri naturali infiniti.Genera sequenza infinita di numeri naturali usando RxJava

IntStream.iterate (1, valore -> valore +1) .forEach (System.out :: println);

Ho provato a utilizzare IntStream con Observable ma non funziona correttamente. Invia un flusso infinito di numeri solo al primo sottoscrittore. Come posso generare correttamente una sequenza infinita di numeri naturali?

import rx.Observable; 
import rx.functions.Action1; 

import java.util.stream.IntStream; 

public class NaturalNumbers { 

    public static void main(String[] args) { 
     Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { 
      IntStream stream = IntStream.iterate(1, val -> val + 1); 
      stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber)); 
     }); 

     Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber); 
     Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber); 
     Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber); 
     naturalNumbers.subscribe(first); 
     naturalNumbers.subscribe(second); 
     naturalNumbers.subscribe(third); 

    } 
} 

risposta

3

Il problema è che l'on naturalNumbers.subscribe(first);, il OnSubscribe implementato viene chiamato e si sta facendo un forEach su un flusso infinito, quindi, perché il programma non termina.

Un modo per gestirlo è iscriversi in modo asincrono su un thread diverso. Per visualizzare facilmente i risultati che ha dovuto introdurre un sonno nel trattamento Stream:

Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { 
    IntStream stream = IntStream.iterate(1, i -> i + 1); 
    stream.peek(i -> { 
     try { 
      // Added to visibly see printing 
      Thread.sleep(50); 
     } catch (InterruptedException e) { 
     } 
    }).forEach(subscriber::onNext); 
}); 

final Subscription subscribe1 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(first); 
final Subscription subscribe2 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(second); 
final Subscription subscribe3 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(third); 

Thread.sleep(1000); 

System.out.println("Unsubscribing"); 
subscribe1.unsubscribe(); 
subscribe2.unsubscribe(); 
subscribe3.unsubscribe(); 
Thread.sleep(1000); 
System.out.println("Stopping"); 
+0

Grazie Mike per la risposta. Sarebbe diverso se io chiamo il metodo subscribeOn mentre creo Observable invece di chiamarlo tre volte come mostrato nello snippet di codice sopra. L'ho provato e il comportamento è lo stesso, ma voglio ancora confermarlo. – Shekhar

+0

Questo problema è stato identificato correttamente, ma questo è un cattivo consiglio: non dovresti mai usare "subscribeOn" per risolvere questo problema - vedi la mia risposta sul perché. –

+1

Chiamando 'unsubscribe' in questo modo disconnette l'utente, quindi smette di ricevere messaggi, ma non interrompe il ciclo del generatore, che continua a funzionare in modo infinito consumando la potenza della CPU. Vedi la mia risposta su come affrontare entrambi i lati della storia. –

2

Observable.Generate è esattamente l'operatore a risolvere questa classe di problema in modo reattivo. Suppongo anche che questo sia un esempio pedagogico, dal momento che usare un iterabile per questo è probabilmente meglio comunque.

Il codice produce l'intero flusso sulla discussione dell'abbonato. Poiché si tratta di un flusso infinito, la chiamata subscribe non verrà mai completata. A parte questo ovvio problema, l'annullamento della sottoscrizione sarà anche problematico dal momento che non lo stai verificando nel tuo ciclo.

Si desidera utilizzare uno scheduler per risolvere questo problema, sicuramente non utilizzare subscribeOn poiché ciò imporrebbe tutti gli osservatori. Pianificare la consegna di ciascun numero a onNext - e come ultimo passaggio in ogni azione programmata, pianificare il successivo.

In sostanza, questo è ciò che fornisce Observable.generate - ogni iterazione è pianificata sull'utilità di pianificazione fornita (che per impostazione predefinita è una che introduce la concorrenza se non la si specifica). Le operazioni dell'Utilità di pianificazione possono essere annullate ed evitare l'inattività del thread.

Rx.NET risolve in questo modo (in realtà non v'è un modello async/await che è meglio, ma non è disponibile in Java afaik):

static IObservable<int> Range(int start, int count, IScheduler scheduler) 
{ 
    return Observable.Create<int>(observer => 
    { 
     return scheduler.Schedule(0, (i, self) => 
     { 
      if (i < count) 
      { 
       Console.WriteLine("Iteration {0}", i); 
       observer.OnNext(start + i); 
       self(i + 1); 
      } 
      else 
      { 
       observer.OnCompleted(); 
      } 
     }); 
    }); 
} 

Due cose da notare qui:

  • La chiamata to Schedule restituisce un handle di abbonamento restituito all'osservatore
  • Il programma è ricorsivo: il parametro self è un riferimento allo scheduler utilizzato per chiamare l'iterazione successiva. Ciò consente la disiscrizione per annullare l'operazione.

Non sono sicuro di come apparirà in RxJava, ma l'idea dovrebbe essere la stessa.Nuovamente, Observable.generate sarà probabilmente più semplice per te in quanto è stato progettato per occuparsi di questo scenario.

1

Quando si crea infinite sequenze occorre prestare attenzione a:

  1. sottoscrivere e osservare su diversi fili; altrimenti servirai solo per un solo utente
  2. interrompe la generazione di valori non appena termina l'abbonamento; altrimenti loop fuga mangeranno la CPU

Il primo problema è risolto utilizzando subscribeOn(), observeOn() e vari scheduler.

Il secondo problema può essere risolto utilizzando i metodi forniti dalla libreria Observable.generate() o Observable.fromIterable(). Fanno il controllo corretto.

Controllare questo:

Observable<Integer> naturalNumbers = 
     Observable.<Integer, Integer>generate(() -> 1, (s, g) -> { 
      logger.info("generating {}", s); 
      g.onNext(s); 
      return s + 1; 
     }).subscribeOn(Schedulers.newThread()); 
Disposable sub1 = naturalNumbers 
     .subscribe(v -> logger.info("1 got {}", v)); 
Disposable sub2 = naturalNumbers 
     .subscribe(v -> logger.info("2 got {}", v)); 
Disposable sub3 = naturalNumbers 
     .subscribe(v -> logger.info("3 got {}", v)); 

Thread.sleep(100); 

logger.info("unsubscribing..."); 
sub1.dispose(); 
sub2.dispose(); 
sub3.dispose(); 
Thread.sleep(1000); 

logger.info("done");