2015-11-06 12 views
5

Sono un principiante in Rxjava. Ho il codice seguente:Rxjava: Iscriviti alla thread specifica

System.out.println("1: " + Thread.currentThread().getId()); 
    Observable.create(new rx.Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subcriber) { 
      System.out.println("2: " + Thread.currentThread().getId()); 
      // query database 
      String result = .... 
      subcriber.onNext(result); 
     } 

    }).subscribeOn(Schedulers.newThread()).subscribe(countResult -> { 
     System.out.println("3: " + Thread.currentThread().getId()); 
    }); 

Ad esempio, l'output sarà:

1: 50
2: 100
3: 100

voglio abbonati eseguiti sul thread che ha id 50. Come posso farlo?

+0

Puoi dirci anche perché vuoi farlo? Ci manca un po 'di contesto. –

risposta

-1

Con RxJava 1.0.15, è possibile applicare toBlocking() prima dello subscribe e tutto verrà eseguito sul thread che ha creato l'intera sequenza di operatori.

+0

Non penso che l'OP voglia rendere il suo codice sincrono. Invece, vuole passare a un determinato thread prima di chiamare ogni iscritto. – FriendlyMikhail

+0

L'operazione menziona il thread 50 che è il thread che inizia il calcolo asincrono. Non c'è altro modo che bloccare il subscribe per tornare allo stesso thread. – akarnokd

+0

Stavo pensando di fornire un 'ExecutorService' a' Schedulers.from (executorService) ', qualcosa come questo: ' '' ExecutorService executorService = Executors.newSingleThreadExecutor (r -> Thread.currentThread()); '' '. Sfortunatamente, si blocca con 'IllegalThreadStateException'. – AndroidEx

-1

Quindi subscribeOn indica quale thread inizierà l'Observable che emette elementi, osservando "interruttori" su un thread per il resto della catena osservabile. Metti un observOn (schedulers.CurrentThread()) proprio prima dell'abbonamento e sarai nel thread in cui questo osservabile viene creato piuttosto che nel thread in cui viene eseguito. Ecco una risorsa che spiega molto bene il threading di rxjava. http://www.grahamlea.com/2014/07/rxjava-threading-examples/

+1

Non esiste uno scheduler di CurrentThread in RxJava e non è possibile reinserire l'esecuzione nello stesso thread in parti diverse della pipeline con più ObserveOn a meno che lo scheduler non sia a thread singolo. – akarnokd

+0

prova Schedulers.immediate(), che "Crea e restituisce un {@link Scheduler} che viene eseguito immediatamente sul thread corrente." – FriendlyMikhail

+0

Che ancora non si attacca al thread di creazione ed è effettivamente un no op. – akarnokd

-1

Credo che @akarnokd abbia ragione. Esegui senza il .subscribeOn(Schedulers.newThread()) in modo che accada in modo sincrono o utilizzare toBlocking() appena prima della sottoscrizione. In alternativa, se si desidera solo tutto ciò accada sullo stesso thread, ma non deve essere la corrente filo allora si può fare questo:

Observable 
    .defer(() -> Observable.create(...)) 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(subscriber); 

defer assicura che la chiamata a create accade sul filo di sottoscrizione .

1

Penso che ci siano due casi. O ne hai bisogno per eseguire il thread dell'interfaccia utente o per la sincronizzazione. Come so, non è possibile chiamare una funzione su un thread specifico, perché quando viene chiamato il metodo è legato al contesto del thread, quindi è impossibile chiamare un metodo da un thread a un altro thread. Il tuo problema è che il metodo in subscriber è chiamato da Schedulers.newThread(). Ho anche trovato questo github issue su Schedulers.currentThread(). Quello di cui hai bisogno è di notificare il thread del chiamante quando viene chiamato l'osservatore.

Inoltre è possibile utilizzare akka, è molto più semplice scrivere codice simultaneo con esso.

Ci scusiamo per la mia cattiva grammatica.

+0

Strano, menzionano AndroidSchedulers.handlerThread, ma non esiste un metodo del genere nel mio rxjava .. – Fen1kz

0

Dal docs:

Per impostazione predefinita, l'osservabile e la catena di operatori che si applica a farà il suo lavoro, e vi comunicheremo i suoi osservatori, sullo stesso thread in cui la sua Iscriviti il metodo è chiamato. L'operatore SubscribeOn modifica questo comportamento specificando un altro Utilità di pianificazione su cui deve essere operato l'Osservatorio . L'operatore ObserveOn specifica un diverso Scheduler che l'Observable utilizzerà per inviare notifiche ai suoi osservatori .

Così si può semplicemente utilizzare subscribe invece di subscribeOn di osservare la vostra collezione sullo stesso thread è stato creato, qualcosa di simile:

Observable.create(new rx.Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subcriber) { 
      System.out.println("2: " + Thread.currentThread().getId()); 
      // query database 
      String result = .... 
      subcriber.onNext(result); 
     } 

    }).subscribe(countResult -> { 
     System.out.println("3: " + Thread.currentThread().getId()); 
    }); 

UPDATE: Se l'applicazione è un Applicazione Android, è possibile utilizzare l'abbonamento su un thread in background come si fa e passare i risultati al thread principale utilizzando i messaggi Handler.

Se l'applicazione è un'applicazione Java, potrei suggerire di utilizzare il meccanismo wait() e notify() o considerare l'uso di framework come EventBus o akka per scenari più complessi.

+0

Il punto è passare il thread e tornare ad esso. – Fen1kz