2015-11-20 14 views
7

c'è comunque da dire a java rx di usare il thread corrente nella funzione observOn? Sto scrivendo il codice per Android Syncadapter e voglio che i risultati vengano osservati nel thread dell'adattatore di sincronizzazione e non nel thread principale.Come osservare il thread chiamante in java rx?

Una chiamata rete di esempio con Retrofit + RX Java sembra qualcosa di simile:

MyRetrofitApi.getInstance().getObjects() 
.subscribeOn(Schedulers.io()) 
.observeOn(<current_thread>) 
.subscribe(new Subscriber<Object>() { 
    //do stuff on the sync adapter thread 

} 

Ho provato ad utilizzare utilizzando

... 
.observeOn(AndroidSchedulers.handlerThread(new Handler(Looper.myLooper()))) 
... 

che è allo stesso modo rx Android crea lo scheduler per il thread principale ma non funziona più appena sostituisco Looper.myLooper() per Looper.getMainLooper().

Potrei usare Schedulers.newThread() ma come il suo complesso codice di sincronizzazione con molte chiamate al server creerò costantemente un nuovo thread solo per sparare nuove chiamate di rete che creano nuovamente nuovi thread per avviare più chiamate di rete . C'è un modo per fare questo? O il mio approccio è completamente sbagliato?

+0

Questo è un po 'speculativa, quindi non sto postando questo come un risposta: Dalla versione 2.0-beta2 in poi, Retrofit non mette più la richiesta di rete su un thread diverso - vedi qui: https://github.com/square/retrofit/commit/38ce2bee70342ac1ab08115d74802d3a54d85511 Quindi, se stai usando una versione corrente di Retrofit dovresti essere in grado di saltare il solo 'subscribeOn' e' observOn' e rimanere sul thread dell'adattatore di sincronizzazione per tutto il tempo. Oppure ho frainteso la tua domanda e vuoi creare nuovi thread, ma dovrebbero semplicemente tornare tutti alla discussione da cui hai iniziato? –

risposta

1

Oh, ho appena trovato questo nel wiki all'indirizzo: https://github.com/ReactiveX/RxAndroid#observing-on-arbitrary-threads

new Thread(new Runnable() { 
    @Override 
    public void run() { 
     final Handler handler = new Handler(); // bound to this thread 
     Observable.just("one", "two", "three", "four", "five") 
       .subscribeOn(Schedulers.newThread()) 
       .observeOn(HandlerScheduler.from(handler)) 
       .subscribe(/* an Observer */) 

     // perform work, ... 
    } 
}, "custom-thread-1").start(); 

Penso che questo dovrebbe funzionare per il vostro caso, troppo - tranne la creazione di un nuovo thread, naturalmente ... Quindi, solo:

final Handler handler = new Handler(); // bound to this thread 
MyRetrofitApi.getInstance().getObjects() 
    .subscribeOn(Schedulers.io()) 
    .observeOn(HandlerScheduler.from(handler)) 
    .subscribe(new Subscriber<Object>() { 
     //do stuff on the sync adapter thread 

    } 
+0

Grazie per la rapida repy. Ho aggiornato Android rx (avevo usato una versione precedente) e ho provato l'esempio. Sfortunatamente non ha funzionato, ma ho creato alcuni test e ho scoperto che onNext() è stato chiamato ma il Sottoscrittore non lo riceve. Solo dopo aver aggiunto Looper.loop() alla fine sembra funzionare (che è un ciclo di infite che elabora il messaggio). Anche lo stesso esatto esempio mi dice che ho bisogno di usare Looper.prepare() prima di creare il gestore. Forse sto facendo qualcosa di fondamentalmente sbagliato o potrebbe essere un problema con Android RX stesso? – tiqz

+0

Temo di non poterti dire cosa sta succedendo qui ... vediamo cosa hanno da dire gli altri ... –

+0

@tiqz 'ha scoperto che onNext() viene chiamato ma non viene ricevuto dal Sottoscrittore' il onNext è il metodo di un abbonato quindi quello che scrivi è un po 'strano per me. In teoria, la risposta di david dovrebbe andare bene. Ti consiglierei di pubblicare informazioni più dettagliate su ciò che stai cercando, in modo che possiamo aiutarti. – Diolor

3

provare a utilizzare Schedulers.immediate()

MyRetrofitApi.getInstance().getObjects() 
.subscribeOn(Schedulers.io()) 
.observeOn(Schedulers.immediate()) 
.subscribe(new Subscriber<Object>() { 
    //do stuff on the sync adapter thread 

} 

La sua descrizione dice: Creates and returns a Scheduler that executes work immediately on the current thread.

NOTA:
penso che sia bene per mantenere tutto il lavoro sul filo del SyncAdapter perché sta già utilizzando un thread diverso

+0

Schedulers.immediate() non funziona per me. Ho bisogno di osservare su GlThread ma si osserva su thread da subscribeOn. Quale potrebbe essere la ragione? –

+0

@DmitriyPuchkov Qual è l'ordine delle funzioni 'subscribeOn' e' observOn'? –

+0

L'ordine è lo stesso di quello che scrivi. Risolvo il mio problema creando un esecutore personalizzato che postava i lanci a GLThread. –