2015-04-18 13 views

Sto sperimentando con RxJava e la classe CompletableFuture di Java 8 e non riesco a capire come gestire le condizioni di timeout.Annulla attività su timeout in RxJava

import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable; 

// ... 

    Observable<String> doSomethingSlowly() { 
     CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> { 
      // this call may be very slow - if it takes too long, 
      // we want to time out and cancel it. 
      return processor.slowExternalCall(); 


     return toObservable(task); 

    // ... 

     .timeout(3, TimeUnit.SECONDS, Observable.just("timeout")); 

Questo fondamentalmente funziona (se viene raggiunto il timeout di tre secondi, "timeout" è pubblicato). Vorrei tuttavia in aggiunta cancellare il futuro compito che ho inserito in un Observable - è possibile con un approccio RxJava-centrico?

So che un'opzione potrebbe essere gestire il timeout da solo, utilizzando task.get(3, TimeUnit.SECONDS), ma mi chiedo se sia possibile eseguire tutte le operazioni di gestione delle attività in RxJava.


Puoi mostrare come viene implementato il metodo 'toObservable'? –


https://github.com/lukas-krecan/future-converter/tree/master/rxjava-java8 –



Sì, puoi farlo. Aggiungere Subscription allo Subscriber.

Ciò consente di ascoltare le disiscrizioni, che avverranno se si chiama esplicitamente subscribe().unsubscribe() o se Observable viene completato correttamente o con un errore.

Se si vede una disiscrizione prima che il futuro sia completato, è possibile supporre che sia a causa di un esplicito unsubscribe o di un timeout.

public class FutureTest { 
    public static void main(String[] args) throws IOException { 
       .timeout(1, TimeUnit.SECONDS, Observable.just("timeout")) 
     System.in.read(); // keep process alive 

    private static Observable<String> doSomethingSlowly() { 
     CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { 
      try { 
      } catch (InterruptedException e) { 
      return "Something"; 

     return toObservable(future); 

    private static <T> Observable<T> toObservable(CompletableFuture<T> future) { 
     return Observable.create(subscriber -> { 
      subscriber.add(new Subscription() { 
       private boolean unsubscribed = false; 
       public void unsubscribe() { 
        if (!future.isDone()){ 
        unsubscribed = true; 

       public boolean isUnsubscribed() { 
        return unsubscribed; 

      future.thenAccept(value -> { 
       if (!subscriber.isUnsubscribed()){ 
      }).exceptionally(throwable -> { 
       if (!subscriber.isUnsubscribed()) { 
       return null; 

Grazie, questo è uno schema utile da sapere. Nel frattempo, ho scoperto che la [semantica di 'CompletableFuture.cancel' è diversa da plain Futures] (http://java.dzone.com/articles/completablefuture-cant-be) - non è possibile interrompere un 'CompletableFuture 'usando il metodo' cancel'. La soluzione che suggerisci, tuttavia, si adatta bene anche ai semplici Futures, quindi vado con quello. – jstaffans