2015-11-25 5 views
19

Ho alcuni CompletableFuture s e voglio eseguirli in parallelo, in attesa del primo che restituisce normalmente.CompletableFuture: in attesa del primo ritorno normale?

So che posso usare CompletableFuture.anyOf di aspettare la prima di tornare, ma questa tornerà normalmente o eccezionalmente. Voglio ignorare le eccezioni.

List<CompletableFuture<?>> futures = names.stream().map(
    (String name) -> 
    CompletableFuture.supplyAsync(
    () -> 
     // this calling may throw exceptions. 
     new Task(name).run() 
    ) 
).collect(Collectors.toList()); 
//FIXME Can not ignore exceptionally returned takes. 
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{})); 
try { 
    logger.info(any.get().toString()); 
} catch (Exception e) { 
    e.printStackTrace(); 
} 

risposta

0

Bene, questo è un metodo che dovrebbe essere supportato dal framework. Innanzitutto, ho pensato che lo CompletionStage.applyToEither facesse qualcosa di simile, ma risulta che non lo fa. Così mi si avvicinò con questa soluzione:

public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) { 
    final int count = stages.size(); 
    if (count <= 0) { 
    throw new IllegalArgumentException("stages must not be empty"); 
    } 
    final AtomicInteger settled = new AtomicInteger(); 
    final CompletableFuture<U> future = new CompletableFuture<U>(); 
    BiConsumer<U, Throwable> consumer = (val, exc) -> { 
    if (exc == null) { 
     future.complete(val); 
    } else { 
     if (settled.incrementAndGet() >= count) { 
     // Complete with the last exception. You can aggregate all the exceptions if you wish. 
     future.completeExceptionally(exc); 
     } 
    } 
    }; 
    for (CompletionStage<U> item : stages) { 
    item.whenComplete(consumer); 
    } 
    return future; 
} 

per vederlo in azione, ecco alcune utilizzo:

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.CompletionStage; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.function.BiConsumer; 

public class Main { 
    public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) { 
    final int count = stages.size(); 
    if (count <= 0) { 
     throw new IllegalArgumentException("stages must not be empty"); 
    } 
    final AtomicInteger settled = new AtomicInteger(); 
    final CompletableFuture<U> future = new CompletableFuture<U>(); 
    BiConsumer<U, Throwable> consumer = (val, exc) -> { 
     if (exc == null) { 
     future.complete(val); 
     } else { 
     if (settled.incrementAndGet() >= count) { 
      // Complete with the last exception. You can aggregate all the exceptions if you wish. 
      future.completeExceptionally(exc); 
     } 
     } 
    }; 
    for (CompletionStage<U> item : stages) { 
     item.whenComplete(consumer); 
    } 
    return future; 
    } 

    private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor(); 

    public static <U> CompletionStage<U> delayed(final U value, long delay) { 
    CompletableFuture<U> future = new CompletableFuture<U>(); 
    worker.schedule(() -> { 
     future.complete(value); 
    }, delay, TimeUnit.MILLISECONDS); 
    return future; 
    } 
    public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay) { 
    CompletableFuture<U> future = new CompletableFuture<U>(); 
    worker.schedule(() -> { 
     future.completeExceptionally(value); 
    }, delay, TimeUnit.MILLISECONDS); 
    return future; 
    } 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
    System.out.println("Started..."); 

    /* 
    // Looks like applyToEither doesn't work as expected 
    CompletableFuture<Integer> a = CompletableFuture.completedFuture(99); 
    CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture(); 
    System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc 
    */ 

    try { 
     List<CompletionStage<Integer>> futures = new ArrayList<>(); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100)); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200)); 
     futures.add(delayed(1, 1000)); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400)); 
     futures.add(delayed(2, 500)); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600)); 
     Integer value = firstCompleted(futures).toCompletableFuture().get(); 
     System.out.println("Completed normally: " + value); 
    } catch (Exception ex) { 
     System.out.println("Completed exceptionally"); 
     ex.printStackTrace(); 
    } 

    try { 
     List<CompletionStage<Integer>> futures = new ArrayList<>(); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400)); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200)); 
     Integer value = firstCompleted(futures).toCompletableFuture().get(); 
     System.out.println("Completed normally: " + value); 
    } catch (Exception ex) { 
     System.out.println("Completed exceptionally"); 
     ex.printStackTrace(); 
    } 

    System.out.println("End..."); 
    } 

} 
7

È possibile utilizzare il seguente metodo di supporto:

public static <T> 
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) { 

    CompletableFuture<T> f=new CompletableFuture<>(); 
    Consumer<T> complete=f::complete; 
    l.forEach(s -> s.thenAccept(complete)); 
    return f; 
} 

cui si può utilizzare in questo modo, per dimostrare che ignorerà le eccezioni precedenti ma restituirà il primo valore fornito:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(
     () -> { throw new RuntimeException("failing immediately"); } 
    ), 
    CompletableFuture.supplyAsync(
     () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); 
      return "with 5s delay"; 
     }), 
    CompletableFuture.supplyAsync(
     () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10)); 
      return "with 10s delay"; 
     }) 
); 
CompletableFuture<String> c = anyOf(futures); 
logger.info(c.join()); 

Uno svantaggio di questa soluzione è che sarà mai completo se tutti i futures completano eccezionalmente. Una soluzione, che fornirà il primo valore se esiste una computazione successo ma sicuro eccezionalmente se non c'è calcolo successo a tutti, è un po 'più complesso:

public static <T> 
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) { 

    CompletableFuture<T> f=new CompletableFuture<>(); 
    Consumer<T> complete=f::complete; 
    CompletableFuture.allOf(
     l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new) 
    ).exceptionally(ex -> { f.completeExceptionally(ex); return null; }); 
    return f; 
} 

Esso utilizza il fatto che gestore eccezionalmente allOf s' viene invocato solo dopo che tutti i futures sono stati completati (in via eccezionale o meno) e che un futuro può essere completato una sola volta (lasciando da parte cose speciali come obtrude…). Quando viene eseguito l'eccezionalmente gestore, qualsiasi tentativo di completare il futuro con un risultato è stato eseguito, se ce n'era uno, quindi il tentativo di completarlo eccezionalmente ha esito positivo solo se non è stato completato con successo.

Può essere utilizzato esattamente nello stesso modo come la prima soluzione e mostrano solo il comportamento diverso se tutti i calcoli falliscono, ad esempio:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(
     () -> { throw new RuntimeException("failing immediately"); } 
    ), 
    CompletableFuture.supplyAsync(
     // delayed to demonstrate that the solution will wait for all completions 
     // to ensure it doesn't miss a possible successful computation 
     () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); 
      throw new RuntimeException("failing later"); } 
    ) 
); 
CompletableFuture<String> c = anyOf(futures); 
try { logger.info(c.join()); } 
catch(CompletionException ex) { logger.severe(ex.toString()); } 

L'esempio precedente utilizza un ritardo dimostrando che la soluzione attenderà tutti completamenti quando non c'è successo, mentre this example on ideone dimostrerà come un successo successivo trasformerà il risultato in successo. Nota che a causa della memorizzazione dei risultati nella cache di Ideones potresti non notare il ritardo.

Si noti che nel caso in cui tutti i futures falliscano, non c'è garanzia su quale delle eccezioni verrà segnalata. Dal momento che attende tutti i completamenti nel caso errato, ognuno potrebbe arrivare al risultato finale.

+0

Cerchiamo [Continuiamo questa discussione in videochat] (http://chat.stackoverflow.com/rooms/97948/discussion-between-basilevs-and-holger). – Basilevs

+0

@Basilevs: ho ampliato la risposta – Holger

4

Considerato che:

  1. Uno dei fondamenti della filosofia di Java è quello di prevenire o scoraggiare pratiche di programmazione cattive.

    (Fino a che punto si è riuscito a fare ciò è oggetto di un altro dibattito,. Il punto si trova ancora che questo è stato senza dubbio uno degli obiettivi primari della lingua)

  2. eccezioni Ignorando è una pessima pratica

    Un'eccezione deve sempre essere sia rilanciati allo strato superiore, oppure manipolati, o perlomeno segnalato. In particolare, un'eccezione dovrebbe non essere mai inghiottita silenziosamente.

  3. Gli errori devono essere segnalati al più presto possibile.

    per esempio, vedere i dolori del runtime passa attraverso al fine di fornire fallire veloci iteratori che gettano un ConcurrentModificationException se la raccolta viene modificato durante l'iterazione.

  4. Ignorando un eccezionalmente completato CompletableFuture significa che a) non si sta segnalando un errore nel più breve tempo possibile, e b) si è probabile pianificazione di non segnalarlo a tutti.

  5. L'incapacità di aspettare semplicemente il primo completamento non eccezionale e invece di dover essere disturbati da completamenti eccezionali non impone alcun onere significativo, perché è sempre possibile rimuovere l'elemento completato eccezionalmente dall'elenco, (mentre al Allo stesso tempo non dimenticare di segnalare il fallimento, giusto?) e ripetere l'attesa.

Vorrei quindi non stupitevi se il ricercato per la funzione è intenzionalmente manca da Java, e sarei disposto a sostenere che si tratta di giustamente mancante.

(Sorry Sotirios, nessuna risposta canonica.)

+0

Consideriamo fonti alternative di informazioni (ad esempio un backup hot-swap o un cluster di bilanciamento del carico). Se le fonti sono intercambiabili, conosciute per fallire occasionalmente e richiedono molto tempo per rispondere, è perfettamente legale e desiderabile ignorare alcuni errori. – Basilevs

+0

@Basilevs true, ma sarebbe comunque meglio registrarli e ignorare i messaggi di registro. Fallimenti di qualsiasi tipo senza registrazione di essi di qualsiasi tipo non sono una buona idea. –

-2

Sarà questo lavoro? Restituisce un flusso di tutti i futures completati normalmente e ne restituisce uno.

futures.stream() 
    .filter(f -> { 
    try{ 
     f.get(); 
     return true; 
    }catch(ExecutionException | InterruptedException e){ 
     return false; 
    } 
    }) 
    .findAny();