2010-03-15 8 views
13

Ho un flusso di controllo asincrono come la seguente:Come dividere e inviare un flusso di controllo asincrono usando Continuazioni?

ActorA ! DoA(dataA, callback1, callbackOnErrorA) 

def callback1() = { 
    ... 
    ActorB ! DoB(dataB, callback2, callbackOnErrorB) 
} 

def callback2() = { 
    ActorC ! DoC(dataC, callback3, callbackOnErrorC) 
} 

... 

Come avrei dividere questo flusso in più parti (continuazioni) e sequenzialmente inviare questi per diversi attori (o thread/task) pur mantenendo lo stato complessivo ?

Qualsiasi suggerimento apprezzato, Grazie

risposta

7

Questo è molto semplificata, ma mostra come suddividere un unico flusso di controllo tra tre attori, passando allo stato insieme a ciascuno:

package blevins.example 

import scala.continuations._ 
import scala.continuations.ControlContext._ 
import scala.actors.Actor._ 
import scala.actors._ 

object App extends Application { 

    val actorA, actorB, actorC = actor { 
    receive { 
     case f: Function1[Unit,Unit] => { f() } 
    } 
    } 

    def handle(a: Actor) = shift { k: (Unit=>Unit) => 
    a ! k 
    } 

    // Control flow to split up 
    reset { 
     // this is not handled by any actor 
     var x = 1 
     println("a: " + x) 

     handle(actorA) // actorA handles the below 
     x += 4 
     println("b: " + x) 

     handle(actorB) // then, actorB handles the rest 
     var y = 2 
     x += 2 
     println("c: " + x) 

     handle(actorC) // and so on... 
     y += 1 
     println("d: " + x + ":" + y) 
    } 

} 
+0

Grande, grazie! – hotzen

9

mi piace usare scalaz.concurrent.Promise. Questo esempio non è esattamente come quello nella tua domanda, ma ti dà l'idea.

object Async extends Application { 
    import scalaz._ 
    import Scalaz._ 
    import concurrent._ 
    import concurrent.strategy._ 
    import java.util.concurrent.{ExecutorService, Executors} 

    case class ResultA(resultb: ResultB, resulta: ResultC) 
    case class ResultB() 
    case class ResultC() 

    run 

    def run { 
    implicit val executor: ExecutorService = Executors.newFixedThreadPool(8) 
    import Executor.strategy 

    val promiseA = doA 
    println("waiting for results") 
    val a: ResultA = promiseA.get 
    println("got " + a) 
    executor.shutdown  
    } 

    def doA(implicit s: Strategy[Unit]): Promise[ResultA] = { 
    println("triggered A") 
    val b = doB 
    val c = doC 
    for {bb <- b; cc <- c} yield ResultA(bb, cc) 
    } 

    def doB(implicit s: Strategy[Unit]): Promise[ResultB] = { 
    println("triggered B") 
    promise { Thread.sleep(1000); println("returning B"); ResultB() } 
    } 

    def doC(implicit s: Strategy[Unit]): Promise[ResultC] = { 
    println("triggered C") 
    promise { Thread.sleep(1000); println("returning C"); ResultC() } 
    } 
} 

uscita:

triggered A 
triggered B 
triggered C 
waiting for results 
returning B 
returning C 
got ResultA(ResultB(),ResultC()) 

Troverete un'introduzione Scalaz concorrenza in questo presentation da Runar.

Questo approccio non è flessibile come gli attori, ma si compone meglio e non può deadlock.

+0

Nice Scalaz Promise-Esempio, grazie. Comunque voglio avere una comprensione più profonda della nuova cosa di Scala2.8-CPS e apprezzerei una risposta specifica per CPS. – hotzen

+0

+1 per menzionare i vantaggi di Futures over Actors e l'uso di valute implicite per definire le strategie. – thSoft