2015-04-16 14 views
7

Uso Retrofit con Observables e desidero concatenare gli osservabili. Di solito funziona bene con funzioni come map() o flatMap(), poiché lo api restituisce un Osservabile che esegue l'operazione. Ma in questo caso devo fare quanto segue:Concatenamento di osservabili RxJava con richiamate/ascoltatori

  1. getKey() dal api
  2. Utilizzare il tasto restituito in un'altra libreria Foo e attendere che il callback per essere chiamato.
  3. Quando viene restituita la richiamata, inviare il risultato a api.

Mi piacerebbe che fosse una chiamata concatenata, quindi devo solo iscrivermi una volta. Immagino di poter usare merge() o join() o qualcosa del genere, ma non ero sicuro di quale sarebbe l'approccio migliore per gestire il callback.

C'è un modo per migliorarlo? Questo è quello che ho finora:

api.getKey().subscribe(new Action1<String>() { 
    @Override 
    public void call(String key) { 
     Foo foo = new Foo(); 
     foo.setAwesomeCallback(new AwesomeCallback() { 
     @Override 
     public void onAwesomeReady(String awesome) { 
      api.sendAwesome(awesome) 
        .subscribe(new Action1<Void>() { 
         @Override 
         public void call(Void aVoid) { 
          handleAwesomeSent(); 
         } 
        }); 
     } 
     }); 
     foo.makeAwesome(); 
    } 
}); 
+0

Dove viene utilizzata la chiave? –

risposta

17

Adattare la soluzione di clemp6r, qui è un altro che non ha bisogno né Subjects né annidati Subscriptions:

api.getKey().flatMap(new Func1<String, Observable<String>>() { 
    @Override 
    public Observable<String> call(String key) { 

     return Observable.create(new Observable.OnSubscribe<String>(){ 

      @Override 
      public void call(final Subscriber<? super String> subscriber) { 
       Foo foo = new Foo(); 
       foo.setAwesomeCallback(new AwesomeCallback() { 
        @Override 
        public void onAwesomeReady(String awesome) { 
         if (! subscriber.isUnsubscribed()) { 
          subscriber.onNext(awesome); 
          subscriber.onComplete(); 
         } 
        } 
       }); 
       foo.makeAwesome(); 
      } 
     }); 
}).flatMap(new Func1<String, Observable<String>>() { 
    @Override 
    public Observable<String> call(String awesome) { 
     return sendAwesome(awesome); 
    } 
}).subscribe(new Action1<Void>() { 
    @Override 
    public void call(Void aVoid) { 
     handleAwesomeSent(); 
    } 
}); 

In generale penso che sia sempre possibilmente per avvolgere qualsiasi operazione asincrona basata sulla richiamata in un Observable utilizzando Observable.create().

+0

Questo è molto utile –

4

Devi usare un PublishSubject per trasformare l'API callback-based per un osservabile.

provare qualcosa di simile che (non testato):

api.getKey().flatMap(new Func1<String, Observable<String>>() { 
    @Override 
    public Observable<String> call(String key) { 
     Foo foo = new Foo(); 
     PublishSubject<String> subject = PublishSubject.create(); 
     foo.setAwesomeCallback(new AwesomeCallback() { 
     @Override 
     public void onAwesomeReady(String awesome) { 
      subject.onNext(awesome); 
      subject.onComplete(); 
     } 
     }); 
     foo.makeAwesome(); 

     return subject; 
    } 
}).flatMap(new Func1<String, Observable<String>>() { 
    @Override 
    public Observable<String> call(String awesome) { 
     return sendAwesome(awesome); 
    } 
}).subscribe(new Action1<Void>() { 
    @Override 
    public void call(Void aVoid) { 
     handleAwesomeSent(); 
    } 
}); 
+0

Dovrai chiamare subject.onComplete() dopo il subject.onNext (awesome) in modo che la catena Observable si completi. – alexwen

+0

Ho modificato la risposta, grazie. – clemp6r

+0

'PublishSubject' non è richiesto, è possibile utilizzare' flatMap' + 'Observable.create()'. –

1
Api api = new Api() { 
    @Override Single<String> getKey() { 
    return Single.just("apiKey"); 
    } 
}; 

api.getKey() 
    .flatMap(key -> Single.<String>create(singleSubscriber -> { 
     Foo foo = new Foo(); 
     foo.setAwesomeCallback(awesome -> { 
      try { singleSubscriber.onSuccess(awesome);} 
      catch (Exception e) { singleSubscriber.onError(e); } 
     }); 
     foo.makeAwesome(); 
    })) 
    .flatMapCompletable(
     awesome -> Completable.create(completableSubscriber -> { 
      try { 
      sendAwesome(awesome); 
      completableSubscriber.onCompleted(); 
      } catch (Exception e) { completableSubscriber.onError(e); } 
     })) 
    .subscribe(this::handleAwesomeSent, throwable -> { }); 

See gist for full anonymous class example

Questa implementazione adatta david.mihola risposta facendo uso di Single e Completable tipi insieme al flatMapCompletable(), pur essendo di tipo sicuro/specifico.