2013-07-16 11 views
5

Ci sono alcuni buoni consigli su come combinare i futures with timeouts. Comunque sono curioso come fare questo con sequenceOfFutures sequenza futuriScala sequenza futura e gestione del timeout

Il mio primo approccio si presenta così

import scala.concurrent._ 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext.Implicits._ 

object FutureSequenceScala extends App { 
    println("Creating futureList") 

    val timeout = 2 seconds 
    val futures = List(1000, 1500, 1200, 800, 2000) map { ms => 
    val f = future { 
     Thread sleep ms 
     ms toString 
    } 
    Future firstCompletedOf Seq(f, fallback(timeout)) 
    } 

    println("Creating waitinglist") 
    val waitingList = Future sequence futures 
    println("Created") 

    val results = Await result (waitingList, timeout * futures.size) 
    println(results) 

    def fallback(timeout: Duration) = future { 
    Thread sleep (timeout toMillis) 
    "-1" 
    } 
} 

c'è un modo migliore per gestire i timeout in una sequenza di futures o si tratta di un soluzione valida?

risposta

8

Qui ci sono alcune cose nel codice che potresti voler riconsiderare. Per cominciare, non sono un grande fan di inviare attività nello ExecutionContext che hanno il solo scopo di simulare un timeout e hanno anche Thread.sleep utilizzato in esse. La chiamata sleep sta bloccando e probabilmente si desidera evitare di avere un'attività nel contesto di esecuzione che blocca puramente per il solo scopo di attendere un intervallo di tempo prestabilito. Sto per rubare dalla mia risposta here e suggerisco che per la pura gestione del timeout, dovresti usare qualcosa come ho delineato in quella risposta. Il HashedWheelTimer è un'implementazione del timer altamente efficiente che è più adatta alla gestione del timeout rispetto a un'attività che dorme.

Ora, se si va su quella rotta, la prossima modifica suggerirei riguarda la gestione dei singoli guasti legati al timeout per ciascun futuro. Se si desidera che un singolo errore non riesca completamente, l'aggregato Future viene restituito dalla chiamata sequence, quindi non fare nulla di più. Se non si desidera che ciò accada, e invece vogliono un timeout di tornare un valore di default, invece, allora è possibile utilizzare recover sul Future come questo:

withTimeout(someFuture).recover{ 
    case ex:TimeoutException => someDefaultValue 
} 

Una volta fatto questo, si può prendere vantaggio dei callback non-blocking e fare qualcosa del genere:

waitingList onComplete{ 
    case Success(results) => //handle success 
    case Failure(ex) => //handle fail 
} 

Ogni futuro ha un timeout e quindi non sarà solo correre all'infinito. Non è necessario IMO bloccare qui e fornire un ulteriore livello di gestione del timeout tramite il parametro atMost a Await.result. Ma immagino che questo presupponga che tu stia bene con l'approccio non bloccante. Se hai davvero bisogno di bloccare lì, allora non dovresti aspettare il tempo di timeout * futures.size. Questi futuri stanno funzionando in parallelo; il timeout dovrebbe solo essere lungo quanto i singoli timeout per il futuro stesso (o solo leggermente più lungo per tenere conto di eventuali ritardi nella CPU/tempistica). Certamente non dovrebbe essere il timeout * il numero totale di futuri.

+0

Come curiosità, come 'HashedWheelTimer' è più efficiente di' TimerTask' o 'newScheduledThreadPoolExecutor'? Entrambi fanno lo stesso lavoro. – Jatin

+0

@ Jatin, suppongo che puoi dare un'occhiata a questo link per maggiori informazioni: http://stackoverflow.com/questions/15347600/which-is-more-efficient-nettys-hashedwheeltimer-or-quartzs-scheduler. Ma nel suo cuore, l'aggiunta di più attività non dovrebbe consumare più risorse. Dovrebbe essere un timer basato sul tempo più costante (in termini di risorse di sistema consumato), quindi qualcosa come un 'Timer' e' TimerTask'. Per un sistema ad alto throughput in cui si pianificheranno un sacco di attività basate sul timeout di breve durata, è una soluzione migliore a causa delle richieste di utilizzo costante delle risorse. – cmbaxter

+0

Ma come 'STPE' con' coresize' '1' consuma più risorse rispetto a' HashedWheelTimer'? Mi dispiace ma non capisco. 'STPE' ha più tempo di inserimento a causa dell'heap interno' O (log (n)) 'ma meno tempo di tick. Può spiegare, per favore, – Jatin

1

Ecco una versione che mostra quanto grave è il blocco fallback.

Si noti che l'executor è a thread singolo e si stanno creando molti fallback.

@cmbaxter ha ragione, il timeout principale non dovrebbe essere timeout * futures.size, dovrebbe essere più grande!

@cmbaxter ha anche ragione che si desidera pensare non bloccante. Una volta che lo fai, e vuoi imporre dei timeout, allora sceglierai un componente timer per quello, vedi la sua risposta collegata (anch'essa collegata dalla tua risposta collegata).

Detto questo, mi piace ancora lo my answer from your link, nella misura in cui sedersi in attesa che la prossima cosa che dovrebbe scadere è davvero semplice.

Richiede solo un elenco di future, i loro timeout e un valore di riserva.

Forse c'è un caso d'uso per questo, come ad esempio una semplice applicazione che a pochi isolati per alcuni risultati (come il test) e non devono uscire prima che i risultati sono in

import scala.concurrent._ 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext 

import java.util.concurrent.Executors 
import java.lang.System.{ nanoTime => now } 

object Test extends App { 
    //implicit val xc = ExecutionContext.global 
    implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor) 

    def timed[A](body: =>A): A = { 
    val start = now 
    val res = body 
    val end = now 
    Console println (Duration fromNanos end-start).toMillis + " " + res 
    res 
    } 
    println("Creating futureList") 

    val timeout = 1500 millis 
    val futures = List(1000, 1500, 1200, 800, 2000) map { ms => 
    val f = future { 
     timed { 
     blocking(Thread sleep ms) 
     ms toString 
     } 
    } 
    Future firstCompletedOf Seq(f, fallback(timeout)) 
    } 

    println("Creating waitinglist") 
    val waitingList = Future sequence futures 
    println("Created") 

    timed { 
    val results = Await result (waitingList, 2 * timeout * futures.size) 
    println(results) 
    }  
    xc.shutdown 

    def fallback(timeout: Duration) = future { 
    timed { 
     blocking(Thread sleep (timeout toMillis)) 
     "-1" 
    } 
    } 
} 

Che cosa è successo:.

Creating futureList 
Creating waitinglist 
Created 
1001 1000 
1500 -1 
1500 1500 
1500 -1 
1200 1200 
1500 -1 
800 800 
1500 -1 
2000 2000 
1500 -1 
List(1000, 1500, 1200, 800, 2000) 
14007()