2015-04-18 13 views
5

Sto sperimentando con RxJava e la classe CompletableFuture di Java 8 e non riesco a capire come gestire le condizioni di timeout.Annulla attività su timeout in RxJava

import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable; 

// ... 

    Observable<String> doSomethingSlowly() { 
     CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> { 
      // this call may be very slow - if it takes too long, 
      // we want to time out and cancel it. 
      return processor.slowExternalCall(); 

     }); 

     return toObservable(task); 
    } 

    // ... 

    doSomethingSlowly() 
     .single() 
     .timeout(3, TimeUnit.SECONDS, Observable.just("timeout")); 

Questo fondamentalmente funziona (se viene raggiunto il timeout di tre secondi, "timeout" è pubblicato). Vorrei tuttavia in aggiunta cancellare il futuro compito che ho inserito in un Observable - è possibile con un approccio RxJava-centrico?

So che un'opzione potrebbe essere gestire il timeout da solo, utilizzando task.get(3, TimeUnit.SECONDS), ma mi chiedo se sia possibile eseguire tutte le operazioni di gestione delle attività in RxJava.

+0

Puoi mostrare come viene implementato il metodo 'toObservable'? –

+1

https://github.com/lukas-krecan/future-converter/tree/master/rxjava-java8 –

risposta

10

Sì, puoi farlo. Aggiungere Subscription allo Subscriber.

Ciò consente di ascoltare le disiscrizioni, che avverranno se si chiama esplicitamente subscribe().unsubscribe() o se Observable viene completato correttamente o con un errore.

Se si vede una disiscrizione prima che il futuro sia completato, è possibile supporre che sia a causa di un esplicito unsubscribe o di un timeout.

public class FutureTest { 
    public static void main(String[] args) throws IOException { 
     doSomethingSlowly() 
       .timeout(1, TimeUnit.SECONDS, Observable.just("timeout")) 
       .subscribe(System.out::println); 
     System.in.read(); // keep process alive 
    } 

    private static Observable<String> doSomethingSlowly() { 
     CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { 
      try { 
       Thread.sleep(2000); 
      } catch (InterruptedException e) { 
      } 
      return "Something"; 

     }); 
     return toObservable(future); 
    } 

    private static <T> Observable<T> toObservable(CompletableFuture<T> future) { 
     return Observable.create(subscriber -> { 
      subscriber.add(new Subscription() { 
       private boolean unsubscribed = false; 
       @Override 
       public void unsubscribe() { 
        if (!future.isDone()){ 
         future.cancel(true); 
        } 
        unsubscribed = true; 
       } 

       @Override 
       public boolean isUnsubscribed() { 
        return unsubscribed; 
       } 
      }); 

      future.thenAccept(value -> { 
       if (!subscriber.isUnsubscribed()){ 
        subscriber.onNext(value); 
        subscriber.onCompleted(); 
       } 
      }).exceptionally(throwable -> { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(throwable); 
       } 
       return null; 
      }); 
     }); 
    } 
} 
+1

Grazie, questo è uno schema utile da sapere. Nel frattempo, ho scoperto che la [semantica di 'CompletableFuture.cancel' è diversa da plain Futures] (http://java.dzone.com/articles/completablefuture-cant-be) - non è possibile interrompere un 'CompletableFuture 'usando il metodo' cancel'. La soluzione che suggerisci, tuttavia, si adatta bene anche ai semplici Futures, quindi vado con quello. – jstaffans